You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/17 21:57:58 UTC
[drill] 03/03: DRILL-6027: - Added fallback option for HashJoin. -
No copy of incoming for single partition,
and avoid HT resize. - Fix memory leak when cancelling while spill file is
read - get correct schema when probe side is empty - Re-create the
HashJoinProbe
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 89e0fe6b34259a2f51a7c45070935a2a2400eca4
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Fri Apr 27 21:59:25 2018 -0700
DRILL-6027:
- Added fallback option for HashJoin.
- No copy of incoming for single partition, and avoid HT resize.
- Fix memory leak when cancelling while spill file is read
- get correct schema when probe side is empty
- Re-create the HashJoinProbe
---
.../java/org/apache/drill/exec/ExecConstants.java | 18 +-
.../drill/exec/physical/base/AbstractBase.java | 4 +-
.../drill/exec/physical/base/PhysicalOperator.java | 4 +-
.../drill/exec/physical/config/ExternalSort.java | 4 +-
.../drill/exec/physical/config/HashAggregate.java | 11 +-
.../drill/exec/physical/config/HashJoinPOP.java | 66 +--
.../drill/exec/physical/impl/BaseRootExec.java | 2 -
.../physical/impl/aggregate/HashAggTemplate.java | 2 +-
.../exec/physical/impl/common/HashPartition.java | 137 ++++--
.../physical/impl/common/HashTableTemplate.java | 31 +-
.../exec/physical/impl/join/HashJoinBatch.java | 468 ++++++---------------
.../impl/join/HashJoinHelperSizeCalculator.java | 2 +-
.../join/HashJoinHelperSizeCalculatorImpl.java | 2 +-
...ava => HashJoinMechanicalMemoryCalculator.java} | 5 +-
.../impl/join/HashJoinMemoryCalculator.java | 2 +-
.../impl/join/HashJoinMemoryCalculatorImpl.java | 3 +-
.../exec/physical/impl/join/HashJoinProbe.java | 45 ++
.../physical/impl/join/HashJoinProbeTemplate.java | 425 +++++++++++++++++++
.../exec/physical/impl/join/HashJoinState.java | 2 +-
.../impl/join/HashJoinStateCalculator.java | 2 +-
.../impl/join/HashTableSizeCalculator.java | 2 +-
.../HashTableSizeCalculatorConservativeImpl.java | 2 +-
.../impl/join/HashTableSizeCalculatorLeanImpl.java | 2 +-
.../impl/protocol/OperatorRecordBatch.java | 5 +
.../apache/drill/exec/record/VectorContainer.java | 101 +----
.../exec/server/options/SystemOptionManager.java | 1 +
.../drill/exec/util/MemoryAllocationUtilities.java | 2 +-
.../work/foreman/rm/ThrottledResourceManager.java | 2 +-
.../java-exec/src/main/resources/drill-module.conf | 4 +
.../drill/exec/physical/impl/MockRecordBatch.java | 3 +
.../physical/impl/common/HashPartitionTest.java | 8 +-
.../impl/join/TestBuildSidePartitioningImpl.java | 4 +-
.../join/TestHashJoinHelperSizeCalculatorImpl.java | 2 +-
.../join/TestHashJoinMemoryCalculatorImpl.java | 2 +-
.../exec/physical/impl/join/TestHashJoinSpill.java | 15 +-
...estHashTableSizeCalculatorConservativeImpl.java | 2 +-
.../join/TestHashTableSizeCalculatorLeanImpl.java | 2 +-
.../exec/physical/impl/join/TestPartitionStat.java | 2 +-
.../impl/join/TestPostBuildCalculationsImpl.java | 2 +-
.../physical/impl/unnest/MockLateralJoinBatch.java | 2 +
.../xsort/managed/TestExternalSortInternals.java | 14 +-
.../physical/impl/xsort/managed/TestSorter.java | 1 -
42 files changed, 868 insertions(+), 547 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c48f414..4510511 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -104,14 +104,14 @@ public final class ExecConstants {
public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed");
// Hash Join Options
- public static String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type";
- public static StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
- public static String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor";
- public static DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
- public static String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor";
- public static DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
- public static String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor";
- public static DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+ public static final String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type";
+ public static final StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+ public static final String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor";
+ public static final DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+ public static final String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor";
+ public static final DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+ public static final String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor";
+ public static final DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch";
public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory";
@@ -122,6 +122,8 @@ public final class ExecConstants {
public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0L, Long.MAX_VALUE);
public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories";
public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs";
+ public static final String HASHJOIN_FALLBACK_ENABLED_KEY = "drill.exec.hashjoin.fallback.enabled";
+ public static final BooleanValidator HASHJOIN_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHJOIN_FALLBACK_ENABLED_KEY);
// Hash Aggregate Options
public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index b93237e..8cf79d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.google.common.base.Preconditions;
@@ -114,9 +115,10 @@ public abstract class AbstractBase implements PhysicalOperator {
/**
* Any operator that supports spilling should override this method (and return true)
* @return false
+ * @param queryContext
*/
@Override @JsonIgnore
- public boolean isBufferedOperator() { return false; }
+ public boolean isBufferedOperator(QueryContext queryContext) { return false; }
@Override
public String getUserName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index d9fcd3a..35138c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphValue;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@@ -91,9 +92,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
/**
*
* @return True iff this operator manages its memory (including disk spilling)
+ * @param queryContext
*/
@JsonIgnore
- boolean isBufferedOperator();
+ boolean isBufferedOperator(QueryContext queryContext);
// public void setBufferedOperator(boolean bo);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index f0e88b4..9ead21c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.config;
import java.util.List;
import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -63,7 +64,8 @@ public class ExternalSort extends Sort {
/**
* The External Sort operator supports spilling
* @return true
+ * @param queryContext
*/
@Override
- public boolean isBufferedOperator() { return true; }
+ public boolean isBufferedOperator(QueryContext queryContext) { return true; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index f8e6d8e..51f34a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.physical.config;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -95,8 +97,13 @@ public class HashAggregate extends AbstractSingle {
}
/**
* The Hash Aggregate operator supports spilling
- * @return true
+ * @return true (unless a single partition is forced)
+ * @param queryContext
*/
@Override
- public boolean isBufferedOperator() { return true; }
+ public boolean isBufferedOperator(QueryContext queryContext) {
+ // In case forced to use a single partition - do not consider this a buffered op (when memory is divided)
+ return queryContext == null ||
+ 1 < (int)queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR) ;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index b56950a..48d977e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -17,63 +17,77 @@
*/
package org.apache.drill.exec.physical.config;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.calcite.rel.core.JoinRelType;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.base.AbstractJoinPop;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-import java.util.List;
@JsonTypeName("hash-join")
public class HashJoinPOP extends AbstractJoinPop {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
- @JsonCreator
- public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
+ @JsonCreator
+ public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
@JsonProperty("conditions") List<JoinCondition> conditions,
@JsonProperty("joinType") JoinRelType joinType) {
super(left, right, joinType, null, conditions);
Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop");
- }
+ }
- @Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2);
HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
newHashJoin.setMaxAllocation(getMaxAllocation());
return newHashJoin;
- }
+ }
- public HashJoinPOP flipIfRight() {
- if (joinType == JoinRelType.RIGHT) {
+ public HashJoinPOP flipIfRight() {
+ if (joinType == JoinRelType.RIGHT) {
List<JoinCondition> flippedConditions = Lists.newArrayList();
for (JoinCondition c : conditions) {
flippedConditions.add(c.flip());
}
return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
- } else {
+ } else {
return this;
- }
- }
+ }
+ }
- @Override
- public int getOperatorType() {
+ @Override
+ public int getOperatorType() {
return CoreOperatorType.HASH_JOIN_VALUE;
}
- @Override
- public void setMaxAllocation(long maxAllocation) {
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
}
- @Override
- public boolean isBufferedOperator() {
- return true;
- }
+ /**
+ * The Hash Aggregate operator supports spilling
+ * @return true (unless a single partition is forced)
+ * @param queryContext
+ */
+ @Override
+ public boolean isBufferedOperator(QueryContext queryContext) {
+ // In case forced to use a single partition - do not consider this a buffered op (when memory is divided)
+ return queryContext == null ||
+ 1 < (int)queryContext.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR) ;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index b18a78e..e148278 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -32,8 +32,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import com.google.common.base.Supplier;
-
public abstract class BaseRootExec implements RootExec {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 4f6a117..258e8d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -541,7 +541,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
// multiply by the max number of rows in a batch to get the final estimated max size
estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
- // (When there are no aggr functions, use '1' as later code relies on this siisDebze being non-zero)
+ // (When there are no aggr functions, use '1' as later code relies on this size being non-zero)
estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5d0197a..e525530 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.common;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
@@ -35,6 +34,8 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
@@ -68,7 +69,7 @@ import java.util.concurrent.TimeUnit;
public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
- public static final String HASH_VALUE_COLUMN_NAME = "Hash_Values";
+ public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
private int partitionNum = -1; // the current number of this partition, as used by the operator
@@ -107,30 +108,30 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
private String spillFile;
private BufferAllocator allocator;
- private int RECORDS_PER_BATCH;
- ChainedHashTable baseHashTable;
+ private int recordsPerBatch;
private SpillSet spillSet;
private boolean isSpilled; // is this partition spilled ?
private boolean processingOuter; // is (inner done spilling and) now the outer is processed?
- private boolean outerBatchNotNeeded; // when the inner is whole in memory
+ private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory
private RecordBatch buildBatch;
private RecordBatch probeBatch;
private int cycleNum;
+ private int numPartitions;
private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
private long partitionInMemorySize;
private long numInMemoryRecords;
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch,
- int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum) {
+ int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
this.allocator = allocator;
- this.baseHashTable = baseHashTable;
this.buildBatch = buildBatch;
this.probeBatch = probeBatch;
- this.RECORDS_PER_BATCH = recordsPerBatch;
+ this.recordsPerBatch = recordsPerBatch;
this.spillSet = spillSet;
this.partitionNum = partNum;
this.cycleNum = cycleNum;
+ this.numPartitions = numPartitions;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -147,7 +148,9 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
}
this.hjHelper = new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
- allocateNewCurrentBatchAndHV();
+ if ( numPartitions > 1 ) {
+ allocateNewCurrentBatchAndHV();
+ }
}
/**
@@ -173,11 +176,11 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
newVC.add(newVV); // add first to allow dealloc in case of an OOM
if (newVV instanceof FixedWidthVector) {
- ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
+ ((FixedWidthVector) newVV).allocateNew(recordsPerBatch);
} else if (newVV instanceof VariableWidthVector) {
- ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH);
+ ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * recordsPerBatch, recordsPerBatch);
} else if (newVV instanceof ObjectVector) {
- ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
+ ((ObjectVector) newVV).allocateNew(recordsPerBatch);
} else {
newVV.allocateNew();
}
@@ -197,10 +200,10 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
* Allocate a new current Vector Container and current HV vector
*/
public void allocateNewCurrentBatchAndHV() {
- if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory
+ if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole in memory
currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
- currHVVector.allocateNew(RECORDS_PER_BATCH);
+ currHVVector.allocateNew(recordsPerBatch);
}
/**
@@ -209,8 +212,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
int pos = currentBatch.appendRow(buildContainer,ind);
- currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column
- if ( pos + 1 == RECORDS_PER_BATCH ) {
+ currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
+ if ( pos == recordsPerBatch ) {
boolean needsSpill = isSpilled || calc.shouldSpill();
completeAnInnerBatch(true, needsSpill);
}
@@ -222,8 +225,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
*/
public void appendOuterRow(int hashCode, int recordsProcessed) {
int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
- currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column
- if ( pos + 1 == RECORDS_PER_BATCH ) {
+ currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
+ if ( pos == recordsPerBatch ) {
completeAnOuterBatch(true);
}
}
@@ -265,6 +268,42 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
}
}
+ /**
+ * Append the incoming batch (actually only the vectors of that batch) into the tmp list
+ */
+ public void appendBatch(VectorAccessible batch) {
+ assert numPartitions == 1;
+ int recordCount = batch.getRecordCount();
+ currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
+ currHVVector.allocateNew(recordCount /* recordsPerBatch */);
+ try {
+ // For every record in the build batch, hash the key columns and keep the result
+ for (int ind = 0; ind < recordCount; ind++) {
+ int hashCode = getBuildHashCode(ind);
+ currHVVector.getMutator().set(ind, hashCode); // store the hash value in the new HV column
+ }
+ } catch(SchemaChangeException sce) {}
+
+ VectorContainer container = new VectorContainer();
+ List<ValueVector> vectors = Lists.newArrayList();
+
+ for (VectorWrapper<?> v : batch) {
+ TransferPair tp = v.getValueVector().getTransferPair(allocator);
+ tp.transfer();
+ vectors.add(tp.getTo());
+ }
+
+ container.addCollection(vectors);
+ container.add(currHVVector); // the HV vector is added as an extra "column"
+ container.setRecordCount(recordCount);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ tmpBatchesList.add(container);
+ partitionBatchesCount++;
+ currHVVector = null;
+ numInMemoryRecords += recordCount;
+ }
+
public void spillThisPartition() {
if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
@@ -300,15 +339,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
v.getValueVector().getMutator().setValueCount(numRecords);
}
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
+ WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
try {
- writer.write(batch, null);
+ writer.write(wBatch, null);
} catch (IOException ioe) {
throw UserException.dataWriteError(ioe)
.message("Hash Join failed to write to output file: " + spillFile)
.build(logger);
} finally {
- batch.clear();
+ wBatch.clear();
}
vc.zeroVectors();
logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords);
@@ -398,20 +437,9 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
return partitionNum;
}
- private void freeCurrentBatchAndHVVector() {
- if ( currentBatch != null ) {
- currentBatch.clear();
- currentBatch = null;
- }
- if ( currHVVector != null ) {
- currHVVector.clear();
- currHVVector = null;
- }
- }
-
- public void closeWriterAndDeleteFile() {
- closeWriterInternal(true);
- }
+ /**
+ * Close the writer without deleting the spill file
+ */
public void closeWriter() { // no deletion !!
closeWriterInternal(false);
processingOuter = true; // After the spill file was closed
@@ -442,7 +470,8 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
}
/**
- * Creates the hash table and join helper for this partition. This method should only be called after all the build side records
+ * Creates the hash table and join helper for this partition.
+ * This method should only be called after all the build side records
* have been consumed.
*/
public void buildContainersHashTableAndHelper() throws SchemaChangeException {
@@ -464,7 +493,6 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
hashTable.updateIncoming(nextBatch, probeBatch );
- // IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector();
IntVector HV_vector = (IntVector) nextBatch.getLast();
for (int recInd = 0; recInd < currentRecordCount; recInd++) {
@@ -473,7 +501,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
hashTable.put(recInd, htIndex, hashCode);
} catch (RetryAfterSpillException RE) {
throw new OutOfMemoryException("HT put");
- } // Hash Join can not retry yet
+ } // Hash Join does not retry
/* Use the global index returned by the hash table, to store
* the current record index and batch index. This will be used
* later when we probe and find a match.
@@ -483,7 +511,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
containers.add(nextBatch);
}
- outerBatchNotNeeded = true; // the inner is whole in memory, no need for an outer batch
+ outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
}
public void getStats(HashTableStats newStats) {
@@ -493,7 +521,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
/**
* Frees memory allocated to the {@link HashTable} and {@link HashJoinHelper}.
*/
- public void clearHashTableAndHelper() {
+ private void clearHashTableAndHelper() {
if (hashTable != null) {
hashTable.clear();
hashTable = null;
@@ -504,7 +532,22 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
}
}
- public void close() {
+ private void freeCurrentBatchAndHVVector() {
+ if ( currentBatch != null ) {
+ currentBatch.clear();
+ currentBatch = null;
+ }
+ if ( currHVVector != null ) {
+ currHVVector.clear();
+ currHVVector = null;
+ }
+ }
+
+ /**
+ * Free all in-memory allocated structures.
+ * @param deleteFile - whether to delete the spill file or not
+ */
+ public void cleanup(boolean deleteFile) {
freeCurrentBatchAndHVVector();
if (containers != null && !containers.isEmpty()) {
for (VectorContainer vc : containers) {
@@ -515,11 +558,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
VectorContainer vc = tmpBatchesList.remove(0);
vc.clear();
}
- closeWriter();
- partitionBatchesCount = 0;
- spillFile = null;
+ closeWriterInternal(deleteFile);
clearHashTableAndHelper();
- if ( containers != null ) { containers.clear(); }
+ if ( containers != null ) {
+ containers.clear();
+ }
+ }
+
+ public void close() {
+ cleanup(true);
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 06e3bcd..bb0b1ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -65,6 +65,8 @@ public abstract class HashTableTemplate implements HashTable {
// Array of batch holders..each batch holder can hold up to BATCH_SIZE entries
private ArrayList<BatchHolder> batchHolders;
+ private int totalBatchHoldersSize; // the size of all batchHolders
+
// Current size of the hash table in terms of number of buckets
private int tableSize = 0;
@@ -483,6 +485,7 @@ public abstract class HashTableTemplate implements HashTable {
// Create the first batch holder
batchHolders = new ArrayList<BatchHolder>();
+ totalBatchHoldersSize = 0;
// First BatchHolder is created when the first put request is received.
try {
@@ -498,6 +501,7 @@ public abstract class HashTableTemplate implements HashTable {
public void updateInitialCapacity(int initialCapacity) {
htConfig = htConfig.withInitialCapacity(initialCapacity);
allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+ enlargeEmptyHashTableIfNeeded(initialCapacity);
}
@Override
@@ -543,6 +547,7 @@ public abstract class HashTableTemplate implements HashTable {
}
batchHolders.clear();
batchHolders = null;
+ totalBatchHoldersSize = 0;
}
startIndices.clear();
// currentIdxHolder = null; // keep IndexPointer in case HT is reused
@@ -568,6 +573,7 @@ public abstract class HashTableTemplate implements HashTable {
if ( batchAdded ) {
logger.trace("OOM - Removing index {} from the batch holders list",batchHolders.size() - 1);
BatchHolder bh = batchHolders.remove(batchHolders.size() - 1);
+ totalBatchHoldersSize -= BATCH_SIZE;
bh.clear();
}
freeIndex--;
@@ -677,7 +683,7 @@ public abstract class HashTableTemplate implements HashTable {
}
htIdxHolder.value = currentIdx;
return addedBatch ? PutStatus.NEW_BATCH_ADDED :
- ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
+ ( freeIndex + 1 > totalBatchHoldersSize /* batchHolders.size() * BATCH_SIZE */ ) ?
PutStatus.KEY_ADDED_LAST : // the last key in the batch
PutStatus.KEY_ADDED; // otherwise
}
@@ -710,9 +716,9 @@ public abstract class HashTableTemplate implements HashTable {
// currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
// the capacity, we will add a new BatchHolder. Return true if a new batch was added.
private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException {
- int totalBatchSize = batchHolders.size() * BATCH_SIZE;
+ // int totalBatchSize = batchHolders.size() * BATCH_SIZE;
- if (currentIdx >= totalBatchSize) {
+ if (currentIdx >= totalBatchHoldersSize) {
BatchHolder bh = newBatchHolder(batchHolders.size(), allocationTracker.getNextBatchHolderSize());
batchHolders.add(bh);
bh.setup();
@@ -721,6 +727,8 @@ public abstract class HashTableTemplate implements HashTable {
}
allocationTracker.commit();
+
+ totalBatchHoldersSize += BATCH_SIZE; // total increased by 1 batch
return true;
}
return false;
@@ -797,6 +805,22 @@ public abstract class HashTableTemplate implements HashTable {
}
/**
+ * Resize up the Hash Table if needed (to hold newNum entries)
+ */
+ public void enlargeEmptyHashTableIfNeeded(int newNum) {
+ assert numEntries == 0;
+ if ( newNum < threshold ) { return; } // no need to resize
+
+ while ( tableSize * 2 < MAXIMUM_CAPACITY && newNum > threshold ) {
+ tableSize *= 2;
+ threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
+ }
+ startIndices.clear();
+ startIndices = allocMetadataVector(tableSize, EMPTY_SLOT);
+ }
+
+
+ /**
* Reinit the hash table to its original size, and clear up all its prior batch holder
*
*/
@@ -806,6 +830,7 @@ public abstract class HashTableTemplate implements HashTable {
freeIndex = 0; // all batch holders are gone
// reallocate batch holders, and the hash table to the original size
batchHolders = new ArrayList<BatchHolder>();
+ totalBatchHoldersSize = 0;
startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
}
public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0c46e36..ee7a8a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -37,8 +37,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -66,7 +68,24 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
/**
+ * This class implements the runtime execution for the Hash-Join operator
+ * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
*
+ * This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of
+ * some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}.
+ * After the build phase is over, in the most general case, some of the partitions were spilled, and the others
+ * are in memory. Each of the partitions in memory would get a {@link HashTable} built.
+ * Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in
+ * memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch.
+ * But if that build side partition was spilled, then the matching Probe size partition is spilled as well.
+ * After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is
+ * processed individually (that Build partition should be smaller than the original, hence likely fit whole into
+ * memory to allow probing; if not -- see below).
+ * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
+ * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ * read and divided into new partitions, which in turn may spill again (and again...).
+ * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
+ * indicating that the number of partitions chosen was too small.
*/
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
@@ -86,6 +105,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Join conditions
private final List<JoinCondition> conditions;
+
+ // Runtime generated class implementing HashJoinProbe interface
+ private HashJoinProbe hashJoinProbe = null;
+
private final List<NamedExpression> rightExpr;
/**
@@ -120,6 +143,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Schema of the build side
private BatchSchema rightSchema;
+ // Schema of the probe side
+ private BatchSchema probeSchema;
+
private int rightHVColPosition;
private BufferAllocator allocator;
@@ -131,15 +157,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private SpillSet spillSet;
HashJoinPOP popConfig;
- private int cycleNum = 0; // primary, secondary, tertiary, etc.
+ private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
private int originalPartition = -1; // the partition a secondary reads from
- IntVector read_HV_vector; // HV vector that was read from the spilled batch
+ IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
private int maxBatchesInMemory;
/**
* This holds information about the spilled partitions for the build and probe side.
*/
- private static class HJSpilledPartition {
+ public static class HJSpilledPartition {
public int innerSpilledBatches;
public String innerSpillFile;
public int outerSpilledBatches;
@@ -189,6 +215,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
setupHashTable();
}
setupOutputContainerSchema();
+ try {
+ hashJoinProbe = setupHashJoinProbe();
+ } catch (IOException | ClassTransformationException e) {
+ throw new SchemaChangeException(e);
+ }
+
// Build the container schema and set the counts
for (final VectorWrapper<?> w : container) {
w.getValueVector().allocateNew();
@@ -212,7 +244,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return false;
}
- if (checkForEarlyFinish()) {
+ if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
state = BatchState.DONE;
return false;
}
@@ -234,11 +266,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
switch (outcome) {
case OK_NEW_SCHEMA:
- // We need to have the schema of the build side even when the build side is empty
- rightSchema = buildBatch.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
- rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
- // new schema can also contain records
+ if ( inputIndex == 0 ) {
+ // Indicate that a schema was seen (in case probe side is empty)
+ probeSchema = probeBatch.getSchema();
+ } else {
+ // We need to have the schema of the build side even when the build side is empty
+ rightSchema = buildBatch.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+ // new schema can also contain records
+ }
case OK:
if (recordBatch.getRecordCount() == 0) {
continue;
@@ -265,12 +302,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
} else {
- return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory);
+ return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
}
}
@Override
public IterOutcome innerNext() {
+ // In case incoming was killed before, just cleanup and return
+ if ( wasKilled ) {
+ this.cleanup();
+ super.close();
+ return IterOutcome.NONE;
+ }
+
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
@@ -280,19 +324,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
executeBuildPhase();
// Update the hash table related stats for the operator
updateStats();
- //
- setupProbe();
+ // Initialize various settings for the probe side
+ hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition);
}
- // Store the number of records projected
-
+ // Try to probe and project, or recursively handle a spilled partition
if ( ! buildSideIsEmpty || // If there are build-side rows
joinType != JoinRelType.INNER) { // or if this is a left/full outer join
// Allocate the memory for the vectors in the output container
allocateVectors();
- outputRecords = probeAndProject();
+ outputRecords = hashJoinProbe.probeAndProject();
/* We are here because of one the following
* 1. Completed processing of all the records and we are done
@@ -314,13 +357,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
for ( HashPartition partn : partitions ) {
- partn.close();
+ partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
- if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) {
+ if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) {
// Get the next (previously) spilled partition to handle as incoming
HJSpilledPartition currSp = spilledPartitionsList.remove(0);
@@ -337,7 +380,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
} else {
probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
leftUpstream = IterOutcome.NONE;
- changeToFinalProbeState();
+ hashJoinProbe.changeToFinalProbeState();
}
// update the cycle num if needed
@@ -356,12 +399,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.cleanup();
throw UserException
.unsupportedError()
- .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n"
+ .message("Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)\n"
+ "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions)
.build(logger);
}
}
- logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size());
+ logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches)." +
+ " More {} spilled partitions left.",
+ currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches,
+ currSp.innerSpilledBatches, spilledPartitionsList.size());
state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
@@ -400,12 +446,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private void setupHashTable() throws SchemaChangeException {
final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
- // When DRILL supports Java 8, use the following instead of the for() loop
- // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
- for (int i=0; i<conditions.size(); i++) {
- JoinCondition cond = conditions.get(i);
- comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
- }
+ conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -441,16 +482,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
//
// Find out the estimated max batch size, etc
// and compute the max numPartitions possible
+ // See partitionNumTuning()
//
- // numPartitions = 8; // just for initial work; change later
- // partitionMask = 7;
- // bitsInMask = 3;
-
- // SET FROM CONFIGURATION OPTIONS :
- // ================================
- // Set the number of partitions from the configuration (raise to a power of two, if needed)
- // Based on the number of partitions: Set the mask and bit count
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
@@ -471,7 +505,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
- RECORDS_PER_BATCH, spillSet, part, cycleNum);
+ RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
}
spilledInners = new HJSpilledPartition[numPartitions];
@@ -526,15 +560,38 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
TARGET_RECORDS_PER_BATCH,
HashTable.DEFAULT_LOAD_FACTOR);
- numPartitions = 1; // We are only using one partition
- canSpill = false; // We cannot spill
- allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+ disableSpilling(null);
}
return buildCalc;
}
/**
+ * Disable spilling - use only a single partition and set the memory limit to the max ( 10GB )
+ * @param reason If not null - log this as warning, else check fallback setting to either warn or fail.
+ */
+ private void disableSpilling(String reason) {
+ // Fail, or just issue a warning if a reason was given, or a fallback option is enabled
+ if ( reason == null ) {
+ final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+ if (fallbackEnabled) {
+ logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
+ " to use unbounded memory");
+ } else {
+ throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " +
+ "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " +
+ "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
+ }
+ } else {
+ logger.warn(reason);
+ }
+
+ numPartitions = 1; // We are only using one partition
+ canSpill = false; // We cannot spill
+ allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+ }
+
+ /**
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
*
@@ -560,7 +617,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
- // We've sniffed first non empty build and probe batches so we have enough information to createa calculator
+ // We've sniffed first non empty build and probe batches so we have enough information to create a calculator
buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed
buildBatch,
probeBatch,
@@ -608,25 +665,30 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
case OK:
+ // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy)
+ if ( numPartitions == 1 ) {
+ partitions[0].appendBatch(buildBatch);
+ break;
+ }
final int currentRecordCount = buildBatch.getRecordCount();
if ( cycleNum > 0 ) {
- read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+ read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast();
}
// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
- : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
+ : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
int currPart = hashCode & partitionMask ;
hashCode >>>= bitsInMask;
// Append the new inner row to the appropriate partition; spill (that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
}
- if ( read_HV_vector != null ) {
- read_HV_vector.clear();
- read_HV_vector = null;
+ if ( read_right_HV_vector != null ) {
+ read_right_HV_vector.clear();
+ read_right_HV_vector = null;
}
break;
}
@@ -637,8 +699,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
- for (HashPartition partn : partitions) {
- partn.completeAnInnerBatch(false, partn.isSpilled() );
+ if ( numPartitions > 1 ) { // a single partition needs no completion
+ for (HashPartition partn : partitions) {
+ partn.completeAnInnerBatch(false, partn.isSpilled());
+ }
}
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
@@ -715,7 +779,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
- if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
for (final VectorWrapper<?> vv : probeBatch) {
final MajorType inputType = vv.getField().getType();
final MajorType outputType;
@@ -761,6 +825,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return spilledInners[part] != null;
}
+ /**
+ * The constructor
+ *
+ * @param popConfig
+ * @param context
+ * @param left -- probe/outer side incoming input
+ * @param right -- build/iner side incoming input
+ * @throws OutOfMemoryException
+ */
public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
RecordBatch left, /*Probe side record batch*/
RecordBatch right /*Build side record batch*/
@@ -783,16 +856,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
}
+ this.allocator = oContext.getAllocator();
+
numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
- canSpill = false;
- logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
+ disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
}
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
- this.allocator = oContext.getAllocator();
-
final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
if (memLimit != 0) {
@@ -802,6 +874,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+ logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit()));
spillSet = new SpillSet(context, popConfig);
// Create empty partitions (in the ctor - covers the case where right side is empty)
@@ -818,10 +891,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
- // clean (and deallocate) each partition
+ // clean (and deallocate) each partition, and delete its spill file
for (HashPartition partn : partitions) {
- partn.clearHashTableAndHelper();
- partn.closeWriterAndDeleteFile();
+ partn.close();
}
// delete any spill file left in unread spilled partitions
@@ -896,288 +968,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public void close() {
- for ( HashPartition partn : partitions ) {
- partn.close();
+ if ( cycleNum > 0 ) { // spilling happened
+ // In case closing due to cancellation, BaseRootExec.close() does not close the open
+ // SpilledRecordBatch "scanners" as it only knows about the original left/right ops.
+ killIncoming(false);
}
- cleanup();
+ this.cleanup();
super.close();
}
- // ==============================================================
- //
- // Methods used for the probe
- //
- // ============================================================
- private BatchSchema probeSchema;
-
- enum ProbeState {
- PROBE_PROJECT, PROJECT_RIGHT, DONE
- }
-
- private int currRightPartition = 0; // for returning RIGHT/FULL
-
- // Number of records to process on the probe side
- private int recordsToProcess = 0;
-
- // Number of records processed on the probe side
- private int recordsProcessed = 0;
-
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
-
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
-
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private List<Integer> unmatchedBuildIndexes = null;
+ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
+ cg.plainJavaCapable(true);
+ // cg.saveCodeForDebugging(true);
- // While probing duplicates, retain current build-side partition in case need to continue
- // probing later on the same chain of duplicates
- private HashPartition currPartition;
+ // No real code generation !!
- /**
- * Various initialization needed to perform the probe
- * Must be called AFTER the build completes
- */
- private void setupProbe() {
- currRightPartition = 0; // In case it's a Right/Full outer join
- recordsProcessed = 0;
- recordsToProcess = 0;
-
- probeSchema = probeBatch.getSchema();
- probeState = ProbeState.PROBE_PROJECT;
-
- // A special case - if the left was an empty file
- if ( leftUpstream == IterOutcome.NONE ){
- changeToFinalProbeState();
- } else {
- this.recordsToProcess = probeBatch.getRecordCount();
- }
-
- // for those outer partitions that need spilling (cause their matching inners spilled)
- // initialize those partitions' current batches and hash-value vectors
- for ( HashPartition partn : partitions ) {
- partn.allocateNewCurrentBatchAndHV();
- }
-
- if ( cycleNum > 0 ) {
- if ( read_HV_vector != null ) { read_HV_vector.clear();}
- if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty
- read_HV_vector = (IntVector) probeBatch.getContainer().getLast();
- }
- }
+ final HashJoinProbe hj = context.getImplementationClass(cg);
+ return hj;
}
- private void executeProjectRightPhase(int currBuildPart) {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
- outputRecords =
- container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
- null /* no probeBatch */, 0 /* no probe index */ );
- recordsProcessed++;
- }
- }
-
- private void executeProbePhase() throws SchemaChangeException {
-
- while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
-
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
-
- IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- recordsProcessed = 0;
- recordsToProcess = 0;
- changeToFinalProbeState();
- // in case some outer partitions were spilled, need to spill their last batches
- for ( HashPartition partn : partitions ) {
- if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
- partn.completeAnOuterBatch(false);
- // update the partition's spill record with the outer side
- HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
- sp.outerSpillFile = partn.getSpillFile();
- sp.outerSpilledBatches = partn.getPartitionBatchesCount();
-
- partn.closeWriter();
- }
-
- continue;
-
- case OK_NEW_SCHEMA:
- if (probeBatch.getSchema().equals(probeSchema)) {
- for ( HashPartition partn : partitions ) { partn.updateBatches(); }
-
- } else {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
- probeSchema,
- probeBatch.getSchema());
- }
- case OK:
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- // If we received an empty batch do nothing
- if (recordsToProcess == 0) {
- continue;
- }
- if ( cycleNum > 0 ) {
- read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
- }
- }
- }
- int probeIndex = -1;
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
-
- if ( !buildSideIsEmpty ) {
- int hashCode = ( cycleNum == 0 ) ?
- partitions[0].getProbeHashCode(recordsProcessed)
- : read_HV_vector.getAccessor().get(recordsProcessed);
- int currBuildPart = hashCode & partitionMask ;
- hashCode >>>= bitsInMask;
-
- // Set and keep the current partition (may be used again on subsequent probe calls as
- // inner rows of duplicate key are processed)
- currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
-
- // If the matching inner partition was spilled
- if ( isSpilledInner(currBuildPart) ) {
- // add this row to its outer partition (may cause a spill, when the batch is full)
-
- currPartition.appendOuterRow(hashCode, recordsProcessed);
-
- recordsProcessed++; // done with this outer record
- continue; // on to the next outer record
- }
-
- probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
-
- }
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- * (and record this match in the bitmap, in case of a FULL/RIGHT join)
- */
- currentCompositeIdx = currPartition.getStartIndex(probeIndex);
-
- outputRecords =
- container.appendRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
- */
- currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- } else { // No matching key
-
- // If we have a left outer join, project the outer side
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
- outputRecords =
- container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition);
- }
- recordsProcessed++;
- }
- }
- else { // match the next inner row with the same key
-
- currPartition.setRecordMatched(currentCompositeIdx);
-
- outputRecords =
- container.appendRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
-
- currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
- }
- }
- }
-
- /**
- * Perform the probe and project the results
- *
- * @return number of output records
- * @throws SchemaChangeException
- */
- private int probeAndProject() throws SchemaChangeException {
-
- outputRecords = 0;
-
- // When handling spilled partitions, the state becomes DONE at the end of each partition
- if ( probeState == ProbeState.DONE ) {
- return outputRecords; // that is zero
- }
-
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
-
- if (probeState == ProbeState.PROJECT_RIGHT) {
- // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
- do {
-
- if (unmatchedBuildIndexes == null) { // first time for this partition ?
- if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
- // Get this partition's list of build indexes that didn't match any record on the probe side
- unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
- recordsProcessed = 0;
- recordsToProcess = unmatchedBuildIndexes.size();
- }
-
- // Project the list of unmatched records on the build side
- executeProjectRightPhase(currRightPartition);
-
- if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
- return outputRecords; // outgoing is full; report and come back later
- } else {
- currRightPartition++; // on to the next right partition
- unmatchedBuildIndexes = null;
- }
-
- } while ( currRightPartition < numPartitions );
-
- probeState = ProbeState.DONE; // last right partition was handled; we are done now
- }
-
- return outputRecords;
- }
-
- private void changeToFinalProbeState() {
- // We are done with the (left) probe phase.
- // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
- probeState =
- (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
- ProbeState.DONE; // else we're done
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
index f5c826a..6ddd05e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
index 728fde5..a17ea2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index 8b367dd..618e80e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.drill.exec.physical.impl.join;
import com.google.common.base.Preconditions;
@@ -24,12 +23,12 @@ import org.apache.drill.exec.record.RecordBatch;
import javax.annotation.Nullable;
import java.util.Set;
-public class MechanicalHashJoinMemoryCalculator implements HashJoinMemoryCalculator {
+public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalculator {
private final int maxNumInMemBatches;
private boolean doMemoryCalc;
- public MechanicalHashJoinMemoryCalculator(int maxNumInMemBatches) {
+ public HashJoinMechanicalMemoryCalculator(int maxNumInMemBatches) {
this.maxNumInMemBatches = maxNumInMemBatches;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index f2de0fe..71292a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 5890a42..ed0adc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
new file mode 100644
index 0000000..f212605
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.record.VectorContainer;
+
+public interface HashJoinProbe {
+ TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+
+ /* The probe side of the hash join can be in the following two states
+ * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+ * key match and if we do we project the record
+ * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+ * from the build side that did not match any records on the probe side. For Left outer
+ * case we handle it internally by projecting the record if there isn't a match on the build side
+ * 3. DONE: Once we have projected all possible records we are done
+ */
+ enum ProbeState {
+ PROBE_PROJECT, PROJECT_RIGHT, DONE
+ }
+
+ void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
+ int probeAndProject() throws SchemaChangeException;
+ void changeToFinalProbeState();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
new file mode 100644
index 0000000..75c3073
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+
+ VectorContainer container; // the outgoing container
+
+ // Probe side record batch
+ private RecordBatch probeBatch;
+
+ private BatchSchema probeSchema;
+
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private JoinRelType joinType;
+
+ private HashJoinBatch outgoingJoinBatch = null;
+
+ private static final int TARGET_RECORDS_PER_BATCH = 4000;
+
+ // Number of records to process on the probe side
+ private int recordsToProcess = 0;
+
+ // Number of records processed on the probe side
+ private int recordsProcessed = 0;
+
+ // Number of records in the output container
+ private int outputRecords;
+
+ // Indicate if we should drain the next record from the probe side
+ private boolean getNextRecord = true;
+
+ // Contains both batch idx and record idx of the matching record in the build side
+ private int currentCompositeIdx = -1;
+
+ // Current state the hash join algorithm is in
+ private ProbeState probeState = ProbeState.PROBE_PROJECT;
+
+ // For outer or right joins, this is a list of unmatched records that needs to be projected
+ private List<Integer> unmatchedBuildIndexes = null;
+
+ private HashPartition partitions[];
+
+ // While probing duplicates, retain current build-side partition in case need to continue
+ // probing later on the same chain of duplicates
+ private HashPartition currPartition;
+
+ private int currRightPartition = 0; // for returning RIGHT/FULL
+ IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
+ private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
+ private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer to find the partition
+ private boolean buildSideIsEmpty = true;
+ private int numPartitions = 1; // must be 2 to the power of bitsInMask
+ private int partitionMask = 0; // numPartitions - 1
+ private int bitsInMask = 0; // number of bits in the MASK
+ private int rightHVColPosition;
+
+ /**
+ * Setup the Hash Join Probe object
+ *
+ * @param probeBatch
+ * @param outgoing
+ * @param joinRelType
+ * @param leftStartState
+ * @param partitions
+ * @param cycleNum
+ * @param container
+ * @param spilledInners
+ * @param buildSideIsEmpty
+ * @param numPartitions
+ * @param rightHVColPosition
+ */
+ @Override
+ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+ this.container = container;
+ this.spilledInners = spilledInners;
+ this.probeBatch = probeBatch;
+ this.probeSchema = probeBatch.getSchema();
+ this.joinType = joinRelType;
+ this.outgoingJoinBatch = outgoing;
+ this.partitions = partitions;
+ this.cycleNum = cycleNum;
+ this.buildSideIsEmpty = buildSideIsEmpty;
+ this.numPartitions = numPartitions;
+ this.rightHVColPosition = rightHVColPosition;
+
+ partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+ bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+ probeState = ProbeState.PROBE_PROJECT;
+ this.recordsToProcess = 0;
+ this.recordsProcessed = 0;
+
+ // A special case - if the left was an empty file
+ if ( leftStartState == IterOutcome.NONE ){
+ changeToFinalProbeState();
+ } else {
+ this.recordsToProcess = probeBatch.getRecordCount();
+ }
+
+ // for those outer partitions that need spilling (cause their matching inners spilled)
+ // initialize those partitions' current batches and hash-value vectors
+ for ( HashPartition partn : this.partitions) {
+ partn.allocateNewCurrentBatchAndHV();
+ }
+
+ currRightPartition = 0; // In case it's a Right/Full outer join
+
+ // Initialize the HV vector for the first (already read) left batch
+ if ( this.cycleNum > 0 ) {
+ if ( read_left_HV_vector != null ) { read_left_HV_vector.clear();}
+ if ( leftStartState != IterOutcome.NONE ) { // Skip when outer spill was empty
+ read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
+ }
+ }
+ }
+
+ /**
+ * Append the given build side row into the outgoing container
+ * @param buildSrcContainer The container for the right/inner side
+ * @param buildSrcIndex build side index
+ * @return The index for the last column (where the probe side would continue copying)
+ */
+ private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
+ // "- 1" to skip the last "hash values" added column
+ int lastColIndex = buildSrcContainer.getNumberOfColumns() - 1 ;
+ for (int vectorIndex = 0; vectorIndex < lastColIndex; vectorIndex++) {
+ ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
+ ValueVector srcVector = buildSrcContainer.getValueVector(vectorIndex).getValueVector();
+ destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
+ }
+ return lastColIndex;
+ }
+ /**
+ * Append the given probe side row into the outgoing container, following the build side part
+ * @param probeSrcContainer The container for the left/outer side
+ * @param probeSrcIndex probe side index
+ * @param baseIndex The column index to start copying into (following the build columns)
+ */
+ private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) {
+ for (int vectorIndex = baseIndex; vectorIndex < container.getNumberOfColumns(); vectorIndex++) {
+ ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
+ ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - baseIndex).getValueVector();
+ destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
+ }
+ }
+ /**
+ * A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
+ * copies the build and probe sides into the outgoing container. (It uses a composite
+ * index for the build side)
+ * @param buildSrcContainers The containers list for the right/inner side
+ * @param compositeBuildSrcIndex Composite build index
+ * @param probeSrcContainer The single container for the left/outer side
+ * @param probeSrcIndex Index in the outer container
+ * @return Number of rows in this container (after the append)
+ */
+ private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
+ VectorContainer probeSrcContainer, int probeSrcIndex) {
+ int buildBatchIndex = compositeBuildSrcIndex >>> 16;
+ int buildOffset = compositeBuildSrcIndex & 65535;
+ int baseInd = 0;
+ if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset); }
+ if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); }
+ return container.incRecordCount();
+ }
+
+ /**
+ * A customised version of the VectorContainer's appendRow for HashJoin - used for Left
+ * Outer Join when there is no build side match - hence need a base index in
+ * this container's wrappers from where to start appending
+ * @param probeSrcContainer
+ * @param probeSrcIndex
+ * @param baseInd - index of this container's wrapper to start at
+ * @return
+ */
+ private int outputOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) {
+ appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
+ return container.incRecordCount();
+ }
+
+
+ private void executeProjectRightPhase(int currBuildPart) {
+ while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
+ outputRecords =
+ outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
+ null /* no probeBatch */, 0 /* no probe index */ );
+ recordsProcessed++;
+ }
+ }
+
+ private void executeProbePhase() throws SchemaChangeException {
+
+ while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+
+ // Check if we have processed all records in this batch we need to invoke next
+ if (recordsProcessed == recordsToProcess) {
+
+ // Done processing all records in the previous batch, clean up!
+ for (VectorWrapper<?> wrapper : probeBatch) {
+ wrapper.getValueVector().clear();
+ }
+
+ IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
+
+ switch (leftUpstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+ changeToFinalProbeState();
+ // in case some outer partitions were spilled, need to spill their last batches
+ for ( HashPartition partn : partitions ) {
+ if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+ partn.completeAnOuterBatch(false);
+ // update the partition's spill record with the outer side
+ HashJoinBatch.HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+ sp.outerSpillFile = partn.getSpillFile();
+ sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+
+ partn.closeWriter();
+ }
+
+ continue;
+
+ case OK_NEW_SCHEMA:
+ if (probeBatch.getSchema().equals(probeSchema)) {
+ for ( HashPartition partn : partitions ) { partn.updateBatches(); }
+
+ } else {
+ throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
+ probeSchema,
+ probeBatch.getSchema());
+ }
+ case OK:
+ recordsToProcess = probeBatch.getRecordCount();
+ recordsProcessed = 0;
+ // If we received an empty batch do nothing
+ if (recordsToProcess == 0) {
+ continue;
+ }
+ if ( cycleNum > 0 ) {
+ read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
+ }
+ }
+ }
+
+ int probeIndex = -1;
+ // Check if we need to drain the next row in the probe side
+ if (getNextRecord) {
+
+ if ( !buildSideIsEmpty ) {
+ int hashCode = ( cycleNum == 0 ) ?
+ partitions[0].getProbeHashCode(recordsProcessed)
+ : read_left_HV_vector.getAccessor().get(recordsProcessed);
+ int currBuildPart = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+
+ // Set and keep the current partition (may be used again on subsequent probe calls as
+ // inner rows of duplicate key are processed)
+ currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
+
+ // If the matching inner partition was spilled
+ if ( outgoingJoinBatch.isSpilledInner(currBuildPart) ) {
+ // add this row to its outer partition (may cause a spill, when the batch is full)
+
+ currPartition.appendOuterRow(hashCode, recordsProcessed);
+
+ recordsProcessed++; // done with this outer record
+ continue; // on to the next outer record
+ }
+
+ probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
+
+ }
+
+ if (probeIndex != -1) {
+
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ * (and record this match in the bitmap, in case of a FULL/RIGHT join)
+ */
+ currentCompositeIdx = currPartition.getStartIndex(probeIndex);
+
+ outputRecords =
+ outputRow(currPartition.getContainers(), currentCompositeIdx,
+ probeBatch.getContainer(), recordsProcessed);
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
+ } else {
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
+ }
+ } else { // No matching key
+
+ // If we have a left outer join, project the outer side
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+
+ outputRecords =
+ outputOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition);
+ }
+ recordsProcessed++;
+ }
+ }
+ else { // match the next inner row with the same key
+
+ currPartition.setRecordMatched(currentCompositeIdx);
+
+ outputRecords =
+ outputRow(currPartition.getContainers(), currentCompositeIdx,
+ probeBatch.getContainer(), recordsProcessed);
+
+ currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ // We don't have any more rows matching the current key on the build side, move on to the next probe row
+ getNextRecord = true;
+ recordsProcessed++;
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Perform the probe, till the outgoing is full, or no more rows to probe.
+ * Performs the inner or left-outer join while there are left rows,
+ * when done, continue with right-outer, if appropriate.
+ * @return Num of output records
+ * @throws SchemaChangeException
+ */
+ @Override
+ public int probeAndProject() throws SchemaChangeException {
+
+ outputRecords = 0;
+
+ // When handling spilled partitions, the state becomes DONE at the end of each partition
+ if ( probeState == ProbeState.DONE ) {
+ return outputRecords; // that is zero
+ }
+
+ if (probeState == ProbeState.PROBE_PROJECT) {
+ executeProbePhase();
+ }
+
+ if (probeState == ProbeState.PROJECT_RIGHT) {
+ // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
+
+ do {
+
+ if (unmatchedBuildIndexes == null) { // first time for this partition ?
+ if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
+ // Get this partition's list of build indexes that didn't match any record on the probe side
+ unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
+ recordsProcessed = 0;
+ recordsToProcess = unmatchedBuildIndexes.size();
+ }
+
+ // Project the list of unmatched records on the build side
+ executeProjectRightPhase(currRightPartition);
+
+ if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
+ return outputRecords; // outgoing is full; report and come back later
+ } else {
+ currRightPartition++; // on to the next right partition
+ unmatchedBuildIndexes = null;
+ }
+
+ } while ( currRightPartition < numPartitions );
+
+ probeState = ProbeState.DONE; // last right partition was handled; we are done now
+ }
+
+ return outputRecords;
+ }
+
+ @Override
+ public void changeToFinalProbeState() {
+ // We are done with the (left) probe phase.
+ // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
+ probeState =
+ (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
+ ProbeState.DONE; // else we're done
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
index 33f22be..7fe9b5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
index 695a68c..4036438 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
index 04d15ff..0bc0a7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
index 05354d5..8575021 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
index 87d8d1b..4f9e585 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 4f0cff8..620f150 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -153,4 +153,9 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
public void close() {
driver.close();
}
+
+ @Override
+ public VectorContainer getContainer() {
+ return batchAccessor.getOutgoingContainer();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index d966f50..e35bb5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.record;
import java.lang.reflect.Array;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -218,7 +217,7 @@ public class VectorContainer implements VectorAccessible {
* Appends a row taken from a source {@link VectorContainer} to this {@link VectorContainer}.
* @param srcContainer The {@link VectorContainer} to copy a row from.
* @param srcIndex The index of the row to copy from the source {@link VectorContainer}.
- * @return Position where the row was appended
+ * @return Position one above where the row was appended
*/
public int appendRow(VectorContainer srcContainer, int srcIndex) {
for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
@@ -226,95 +225,8 @@ public class VectorContainer implements VectorAccessible {
ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
destVector.copyEntry(recordCount, srcVector, srcIndex);
}
- int pos = recordCount++;
- initialized = true;
- return pos;
- }
-
- /**
- * This method currently is only used by the Hash Join to return a row composed of build+probe rows
- *
- * This works with non-hyper {@link VectorContainer}s which have no selection vectors.
- * Appends a row taken from two source {@link VectorContainer}s to this {@link VectorContainer}.
- * @param buildSrcContainer The {@link VectorContainer} to copy the first columns of a row from.
- * @param buildSrcIndex The index of the row to copy from the build side source {@link VectorContainer}.
- * @param probeSrcContainer The {@link VectorContainer} to copy the last columns of a row from.
- * @param probeSrcIndex The index of the row to copy from the probe side source {@link VectorContainer}.
- * @return Number of records in the container after appending
- */
- public int appendRowXXX(VectorContainer buildSrcContainer, int buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
- if ( buildSrcContainer != null ) {
- for (int vectorIndex = 0; vectorIndex < buildSrcContainer.wrappers.size(); vectorIndex++) {
- ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
- ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
- destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
- }
- }
- if ( probeSrcContainer != null ) {
- int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
- for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) {
- ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
- ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex).getValueVector();
- destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
- }
+ return incRecordCount();
}
- recordCount++;
- initialized = true;
- return recordCount;
- }
-
- private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
- // "- 1" to skip the last "hash values" added column
- int lastIndex = buildSrcContainer.wrappers.size() - 1 ;
- for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) {
- ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
- ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
- destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
- }
- return lastIndex;
- }
- private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) {
- // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
- for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) {
- ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
- ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - baseIndex).getValueVector();
- destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
- }
- }
- /**
- * A special version of appendRow for the HashJoin; uses a composite index for the build side
- * @param buildSrcContainers The containers list for the right side
- * @param compositeBuildSrcIndex Composite build index
- * @param probeSrcContainer The single container for the left/outer side
- * @param probeSrcIndex Index in the outer container
- * @return Number of rows in this container (after the append)
- */
- public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
- int buildBatch = compositeBuildSrcIndex >>> 16;
- int buildOffset = compositeBuildSrcIndex & 65535;
- int baseInd = 0;
- if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatch), buildOffset); }
- if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); }
- recordCount++;
- initialized = true;
- return recordCount;
- }
-
- /**
- * A customised version of the special appendRow for HashJoin - used for Left
- * Outer Join when there is no build side match - hence need a base index in
- * this container's wrappers from where to start appending
- * @param probeSrcContainer
- * @param probeSrcIndex
- * @param baseInd - index of this container's wrapper to start at
- * @return
- */
- public int appendOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) {
- appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
- recordCount++;
- initialized = true;
- return recordCount;
- }
public TypedFieldId add(ValueVector vv) {
schemaChanged = true;
@@ -459,6 +371,15 @@ public class VectorContainer implements VectorAccessible {
initialized = true;
}
+ /**
+ * Increment the record count
+ * @return the new record count
+ */
+ public int incRecordCount() {
+ initialized = true;
+ return ++recordCount;
+ }
+
@Override
public int getRecordCount() {
Preconditions.checkState(hasRecordCount(), "Record count not set for this vector container");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 183c1f1..663def1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -121,6 +121,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 3189535..20ef79d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -68,7 +68,7 @@ public class MemoryAllocationUtilities {
// look for external sorts
final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
for (final PhysicalOperator op : plan.getSortedOperators()) {
- if (op.isBufferedOperator()) {
+ if (op.isBufferedOperator(queryContext)) {
bufferedOpList.add(op);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
index bbbd88c..9c2a2d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -229,7 +229,7 @@ public class ThrottledResourceManager extends AbstractResourceManager {
@Override
public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
throws RuntimeException {
- if (op.isBufferedOperator()) {
+ if (op.isBufferedOperator(null)) {
value.add(op);
}
visitChildren(op, value);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b4969c0..7d92570 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -421,6 +421,10 @@ drill.exec.options: {
drill.exec.functions.cast_empty_string_to_null: false,
drill.exec.rpc.fragrunner.timeout: 10000,
drill.exec.hashjoin.mem_limit: 0,
+ # Setting to control if HashJoin should fallback to older behavior of consuming
+ # unbounded memory. By default it's set to false such that the
+ # query will fail if there is not enough memory
+ drill.exec.hashjoin.fallback.enabled: true, # should soon be changed to false !!
# Setting to control if HashAgg should fallback to older behavior of consuming
# unbounded memory. In case of 2 phase Agg when available memory is not enough
# to start at least 2 partitions then HashAgg fallbacks to this case. It can be
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 5463974..0c43ab2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -182,6 +182,9 @@ public class MockRecordBatch implements CloseableRecordBatch {
return container.iterator();
}
+ @Override
+ public VectorContainer getContainer() { return container; }
+
public boolean isCompleted() {
return isDone;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index b57329c..6d06434 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -43,8 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
@@ -111,7 +109,8 @@ public class HashPartitionTest {
10,
spillSet,
0,
- 0);
+ 0,
+ 2); // only '1' has a special treatment
final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
@@ -206,7 +205,8 @@ public class HashPartitionTest {
10,
spillSet,
0,
- 0);
+ 0,
+ 2);
final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index dc05a70..30c0c73 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index bd34df4..ed25d78 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
index 4944c87..4fe1fa4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 37a6a33..52e6707 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -6,18 +6,18 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.drill.exec.physical.impl.join;
+import ch.qos.logback.classic.Level;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.categories.OperatorTest;
@@ -25,7 +25,7 @@ import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
-import org.junit.Ignore;
+import org.apache.drill.test.LogFixture;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -33,10 +33,16 @@ import java.util.List;
@Category({SlowTest.class, OperatorTest.class})
public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
+
@SuppressWarnings("unchecked")
@Test
// Should spill, including recursive spill
public void testSimpleHashJoinSpill() {
+ LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+ .toConsole()
+ .logger("org.apache.drill", Level.WARN);
+
+
HashJoinPOP joinConf = new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER);
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
@@ -53,6 +59,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
}
+ LogFixture logs = logBuilder.build();
opTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
index 4e9f1c7..3f01bca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
index 5cdf524..1bd51fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
index 40eec6a..627d737 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index 1e9bb8a..5cf7eca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 0bf995a..36d004c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -230,6 +230,8 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
return count;
}
+ @Override
+ public VectorContainer getContainer() { return null; }
@Override public Iterator<VectorWrapper<?>> iterator() {
return null;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index b8f9563..9c2d5d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -27,8 +27,10 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.SubOperatorTest;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -36,6 +38,8 @@ import org.junit.experimental.categories.Category;
public class TestExternalSortInternals extends SubOperatorTest {
private static final int ONE_MEG = 1024 * 1024;
+ @Rule
+ public final BaseDirTestWatcher watcher = new BaseDirTestWatcher();
/**
* Verify defaults configured in drill-override.conf.
@@ -66,7 +70,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
@Test
public void testConfigOverride() {
// Verify the various HOCON ways of setting memory
- OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
.put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
@@ -92,7 +96,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
*/
@Test
public void testConfigLimits() {
- OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1)
.put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE - 1)
@@ -414,7 +418,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
int batchSizeConstraint = ONE_MEG / 2;
int mergeSizeConstraint = ONE_MEG;
- OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstraint)
.put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstraint)
@@ -470,7 +474,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
// No artificial merge limit
int mergeLimitConstraint = 100;
- OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
.build();
@@ -599,7 +603,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
public void testMergeLimit() {
// Constrain merge width
int mergeLimitConstraint = 5;
- OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ OperatorFixture.Builder builder = new OperatorFixture.Builder(watcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
.build();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index b09b865..0a6d7f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -49,7 +49,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
-import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Lists;
--
To stop receiving notification emails like this one, please contact
boaz@apache.org.