You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/17 21:57:56 UTC
[drill] 01/03: DRILL-6027: Initial implementation of HashJoin spill,
without memory limits checks yet
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 13e72c1ed846a4c2a5ee18129e773a3e2c215cdc
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Wed Feb 14 12:36:39 2018 -0800
DRILL-6027: Initial implementation of HashJoin spill, without memory limits checks yet
---
.../test/java/org/apache/drill/test/DrillTest.java | 2 +-
.../java/org/apache/drill/exec/ExecConstants.java | 14 +
.../apache/drill/exec/cache/VectorSerializer.java | 3 +-
.../drill/exec/physical/impl/BaseRootExec.java | 7 +-
.../apache/drill/exec/physical/impl/ScanBatch.java | 4 +
.../physical/impl/aggregate/HashAggTemplate.java | 19 +-
.../impl/aggregate/SpilledRecordbatch.java | 11 +-
.../physical/impl/common/ChainedHashTable.java | 14 +-
.../exec/physical/impl/common/HashPartition.java | 500 +++++++++++
.../drill/exec/physical/impl/common/HashTable.java | 10 +-
.../physical/impl/common/HashTableTemplate.java | 59 +-
.../exec/physical/impl/join/HashJoinBatch.java | 945 +++++++++++++++------
.../exec/physical/impl/join/HashJoinHelper.java | 4 +
.../exec/physical/impl/join/HashJoinProbe.java | 53 --
.../physical/impl/join/HashJoinProbeTemplate.java | 261 ------
.../drill/exec/physical/impl/spill/SpillSet.java | 5 +
.../unorderedreceiver/UnorderedReceiverBatch.java | 5 +
.../validate/IteratorValidatorBatchIterator.java | 5 +
.../drill/exec/record/AbstractRecordBatch.java | 5 +
.../org/apache/drill/exec/record/RecordBatch.java | 7 +
.../apache/drill/exec/record/SchemalessBatch.java | 3 +
.../drill/exec/record/SimpleRecordBatch.java | 5 +
.../apache/drill/exec/record/VectorContainer.java | 120 ++-
.../exec/server/options/SystemOptionManager.java | 5 +
.../java-exec/src/main/resources/drill-module.conf | 18 +
.../exec/physical/impl/join/TestHashJoinSpill.java | 123 +++
.../exec/physical/unit/MiniPlanUnitTestBase.java | 2 +-
.../exec/physical/unit/PhysicalOpUnitTestBase.java | 9 +-
.../org/apache/drill/test/DrillTestWrapper.java | 14 +-
.../org/apache/drill/test/rowSet/RowSetBatch.java | 3 +
exec/java-exec/src/test/resources/empty.json | 0
31 files changed, 1612 insertions(+), 623 deletions(-)
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 24ec381..28544ab 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -56,7 +56,7 @@ public class DrillTest {
static MemWatcher memWatcher;
static String className;
- @Rule public final TestRule TIMEOUT = new DisableOnDebug(TestTools.getTimeoutRule(100_000));
+ @Rule public final TestRule TIMEOUT = new DisableOnDebug(TestTools.getTimeoutRule(1_000_000));
@Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
@Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 671ce4b..57ecdf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -103,6 +103,20 @@ public final class ExecConstants {
public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed");
+ // Hash Join Options
+ public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch";
+ public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
+ public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory";
+ public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 1, 65536);
+ public static final String HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY = "exec.hashjoin.max_batches_per_partition";
+ public static final LongValidator HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY, 1, 65536);
+ public static final String HASHJOIN_NUM_PARTITIONS_KEY = "exec.hashjoin.num_partitions";
+ public static final LongValidator HASHJOIN_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
+ public static final String HASHJOIN_MAX_MEMORY_KEY = "exec.hashjoin.mem_limit";
+ public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
+ public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories";
+ public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs";
+
// Hash Aggregate Options
public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
index 03ea11e..b5beb62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
@@ -91,7 +91,7 @@ public class VectorSerializer {
final DrillBuf[] incomingBuffers = batch.getBuffers();
final UserBitShared.RecordBatchDef batchDef = batch.getDef();
- bytesWritten = batchDef.getSerializedSize();
+ int bytesWritten = batchDef.getSerializedSize();
/* Write the metadata to the file */
batchDef.writeDelimitedTo(output);
@@ -115,6 +115,7 @@ public class VectorSerializer {
}
timeNs += timerContext.stop();
+ this.bytesWritten += bytesWritten;
return bytesWritten;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 76fd642..b18a78e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -139,12 +139,7 @@ public abstract class BaseRootExec implements RootExec {
// close all operators.
if (operators != null) {
- final DeferredException df = new DeferredException(new Supplier<Exception>() {
- @Override
- public Exception get() {
- return new RuntimeException("Error closing operators");
- }
- });
+ final DeferredException df = new DeferredException();
for (final CloseableRecordBatch crb : operators) {
df.suppressingClose(crb);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ea943b2..4a62752 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -487,6 +487,10 @@ public class ScanBatch implements CloseableRecordBatch {
this.getClass().getCanonicalName()));
}
+ @Override
+ public VectorContainer getContainer() {
+ return container;
+ }
/**
* Verify list of implicit column values is valid input:
* - Either implicit column list is empty;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 4c54080..368fd2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -179,7 +179,7 @@ public abstract class HashAggTemplate implements HashAggregator {
NUM_RESIZING,
RESIZING_TIME_MS,
NUM_PARTITIONS,
- SPILLED_PARTITIONS, // number of partitions spilled to disk
+ SPILLED_PARTITIONS, // number of original partitions spilled to disk
SPILL_MB, // Number of MB of data spilled to disk. This amount is first written,
// then later re-read. So, disk I/O is twice this amount.
// For first phase aggr -- this is an estimate of the amount of data
@@ -187,8 +187,6 @@ public abstract class HashAggTemplate implements HashAggregator {
SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
;
- // duplicate for hash ag
-
@Override
public int metricId() {
return ordinal();
@@ -201,8 +199,6 @@ public abstract class HashAggTemplate implements HashAggregator {
private int maxOccupiedIdx = -1;
private int batchOutputCount = 0;
- private int capacity = Integer.MAX_VALUE;
-
@SuppressWarnings("resource")
public BatchHolder() {
@@ -233,8 +229,6 @@ public abstract class HashAggTemplate implements HashAggregator {
vector.allocateNew();
}
- capacity = Math.min(capacity, vector.getValueCapacity());
-
aggrValuesContainer.add(vector);
}
success = true;
@@ -464,7 +458,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// initialize every (per partition) entry in the arrays
for (int i = 0; i < numPartitions; i++ ) {
try {
- this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+ this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
this.htables[i].setMaxVarcharSize(maxColumnWidth);
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
@@ -490,12 +484,13 @@ public abstract class HashAggTemplate implements HashAggregator {
public RecordBatch getNewIncoming() { return newIncoming; }
private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
- baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming
+ baseHashTable.updateIncoming(newIncoming, null); // after a spill - a new incoming
this.incoming = newIncoming;
currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
nextPartitionToReturn = 0;
for (int i = 0; i < numPartitions; i++ ) {
- htables[i].reinit(newIncoming);
+ htables[i].updateIncoming(newIncoming.getContainer(), null);
+ htables[i].reset();
if ( batchHolders[i] != null) {
for (BatchHolder bh : batchHolders[i]) {
bh.clear();
@@ -547,7 +542,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
// multiply by the max number of rows in a batch to get the final estimated max size
estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
- // (When there are no aggr functions, use '1' as later code relies on this size being non-zero)
+ // (When there are no aggr functions, use '1' as later code relies on this siisDebze being non-zero)
estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
@@ -1260,7 +1255,7 @@ public abstract class HashAggTemplate implements HashAggregator {
int hashCode;
try {
// htables[0].updateBatches();
- hashCode = htables[0].getHashCode(incomingRowIdx);
+ hashCode = htables[0].getBuildHashCode(incomingRowIdx);
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException("Unexpected schema change", e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index c473b94..c78e2c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -48,6 +48,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
private SpillSet spillSet;
private String spillFile;
VectorAccessibleSerializable vas;
+ private IterOutcome initialOutcome;
public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
this.context = context;
@@ -64,7 +65,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
throw UserException.resourceError(e).build(HashAggBatch.logger);
}
- next(); // initialize the container
+ initialOutcome = next(); // initialize the container
}
@Override
@@ -107,6 +108,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
public VectorContainer getOutgoingContainer() { return container; }
@Override
+ public VectorContainer getContainer() { return container; }
+
+ @Override
public int getRecordCount() { return container.getRecordCount(); }
@Override
@@ -156,6 +160,11 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
}
/**
+ * Return the initial outcome (from the first next() call )
+ */
+ public IterOutcome getInitialOutcome() { return initialOutcome; }
+
+ /**
* Note: ignoring any IO errors (e.g. file not found)
*/
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index e061bdd..89e32a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -116,7 +116,7 @@ public class ChainedHashTable {
private final FragmentContext context;
private final BufferAllocator allocator;
private RecordBatch incomingBuild;
- private final RecordBatch incomingProbe;
+ private RecordBatch incomingProbe;
private final RecordBatch outgoing;
public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
@@ -130,11 +130,12 @@ public class ChainedHashTable {
this.outgoing = outgoing;
}
- public void updateIncoming(RecordBatch incomingBuild) {
+ public void updateIncoming(RecordBatch incomingBuild, RecordBatch incomingProbe) {
this.incomingBuild = incomingBuild;
+ this.incomingProbe = incomingProbe;
}
- public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException,
+ public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
IOException, SchemaChangeException {
CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getOptions());
top.plainJavaCapable(true);
@@ -225,14 +226,13 @@ public class ChainedHashTable {
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
HashTable ht = context.getImplementationClass(top);
- ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
+ ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig);
return ht;
}
private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
- LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
- throws SchemaChangeException {
+ LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) {
cg.setMappingSet(incomingMapping);
if (keyExprs == null || keyExprs.length == 0) {
@@ -276,7 +276,7 @@ public class ChainedHashTable {
}
private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs,
- TypedFieldId[] htKeyFieldIds) throws SchemaChangeException {
+ TypedFieldId[] htKeyFieldIds) {
cg.setMappingSet(SetValueMapping);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
new file mode 100644
index 0000000..5b4adf1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ObjectVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The class HashPartition
+ *
+ * Created to represent an active partition for the Hash-Join operator
+ * (active means: currently receiving data, or its data is being probed; as opposed to fully
+ * spilled partitions).
+ * After all the build/iner data is read for this partition - if all its data is in memory, then
+ * a hash table and a helper are created, and later this data would be probed.
+ * If all this partition's build/inner data was spilled, then it begins to work as an outer
+ * partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
+ * currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
+ */
+public class HashPartition {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
+
+ private int partitionNum = -1; // the current number of this partition, as used by the operator
+
+ private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
+ private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+
+ private final MajorType HVtype = MajorType.newBuilder()
+ .setMinorType(MinorType.INT /* dataType */ )
+ .setMode(DataMode.REQUIRED /* mode */ )
+ .build();
+
+ // The vector containers storing all the inner rows
+ // * Records are retrieved from these containers when there is a matching record
+ // * on the probe side
+ private ArrayList<VectorContainer> containers;
+
+ // While build data is incoming - temporarily keep the list of in-memory
+ // incoming batches, per each partition (these may be spilled at some point)
+ private List<VectorContainer> tmpBatchesList;
+ // A batch and HV vector to hold incoming rows - per each partition
+ private VectorContainer currentBatch; // The current (newest) batch
+ private IntVector currHVVector; // The HV vectors for the currentBatches
+
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper;
+
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable;
+
+ private VectorSerializer.Writer writer; // a vector writer for each spilled partition
+ private int partitionBatchesCount; // count number of batches spilled
+ private String spillFile;
+
+ private BufferAllocator allocator;
+ private FragmentContext context;
+ private int RECORDS_PER_BATCH;
+ ChainedHashTable baseHashTable;
+ private SpillSet spillSet;
+ private boolean isSpilled; // is this partition spilled ?
+ private boolean processingOuter; // is (inner done spilling and) now the outer is processed?
+ private boolean outerBatchNotNeeded; // when the inner is whole in memory
+ private RecordBatch buildBatch;
+ private RecordBatch probeBatch;
+ private HashJoinBatch.inMemBatchCounter inMemBatches; // shared among all partitions
+ private int cycleNum;
+
+ public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
+ RecordBatch buildBatch, RecordBatch probeBatch,
+ int recordsPerBatch, SpillSet spillSet, int partNum,
+ HashJoinBatch.inMemBatchCounter inMemBatches, int cycleNum) {
+ this.context = context;
+ this.allocator = allocator;
+ this.baseHashTable = baseHashTable;
+ this.buildBatch = buildBatch;
+ this.probeBatch = probeBatch;
+ this.RECORDS_PER_BATCH = recordsPerBatch;
+ this.spillSet = spillSet;
+ this.partitionNum = partNum;
+ this.inMemBatches = inMemBatches;
+ this.cycleNum = cycleNum;
+
+ try {
+ this.hashTable = baseHashTable.createAndSetupHashTable(null);
+ this.hashTable.setMaxVarcharSize(maxColumnWidth);
+ } catch (ClassTransformationException e) {
+ throw UserException.unsupportedError(e)
+ .message("Code generation error - likely an error in the code.")
+ .build(logger);
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .message("IO Error while creating a hash table.")
+ .build(logger);
+ } catch (SchemaChangeException sce) {
+ throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
+ }
+ this.hjHelper = new HashJoinHelper(context, allocator);
+ tmpBatchesList = new ArrayList<>();
+ allocateNewCurrentBatchAndHV();
+ }
+
+ /**
+ * Allocate a new vector container for either right or left record batch
+ * Add an additional special vector for the hash values
+ * Note: this call may OOM !!
+ * @param rb - either the right or the left record batch
+ * @return the new vector container
+ */
+ private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
+ VectorContainer newVC = new VectorContainer();
+ VectorContainer fromVC = rb.getContainer();
+ Iterator<VectorWrapper<?>> vci = fromVC.iterator();
+ boolean success = false;
+
+ try {
+ while (vci.hasNext()) {
+ VectorWrapper vw = vci.next();
+ // If processing a spilled container, skip the last column (HV)
+ if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
+ ValueVector vv = vw.getValueVector();
+ ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator);
+ newVC.add(newVV); // add first to allow dealloc in case of an OOM
+
+ if (newVV instanceof FixedWidthVector) {
+ ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
+ } else if (newVV instanceof VariableWidthVector) {
+ // Need to check - (is this case ever used?) if a varchar falls under ObjectVector which is allocated on the heap !
+ ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH);
+ } else if (newVV instanceof ObjectVector) {
+ ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
+ } else {
+ newVV.allocateNew();
+ }
+ }
+
+ newVC.setRecordCount(0);
+ inMemBatches.inc(); ; // one more batch in memory
+ success = true;
+ } finally {
+ if ( !success ) {
+ newVC.clear(); // in case of an OOM
+ }
+ }
+ return newVC;
+ }
+
+ /**
+ * Allocate a new current Vector Container and current HV vector
+ */
+ public void allocateNewCurrentBatchAndHV() {
+ if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory
+ currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
+ currHVVector = new IntVector(MaterializedField.create("Hash_Values", HVtype), allocator);
+ currHVVector.allocateNew(RECORDS_PER_BATCH);
+ }
+
+ /**
+ * Spills if needed
+ */
+ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, boolean needsSpill) {
+
+ int pos = currentBatch.appendRow(buildContainer,ind);
+ currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column
+ if ( pos + 1 == RECORDS_PER_BATCH ) {
+ completeAnInnerBatch(true, needsSpill);
+ }
+ }
+
+ /**
+ * Outer always spills when batch is full
+ *
+ */
+ public void appendOuterRow(int hashCode, int recordsProcessed) {
+ int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
+ currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column
+ if ( pos + 1 == RECORDS_PER_BATCH ) {
+ completeAnOuterBatch(true);
+ }
+ }
+
+ public void completeAnOuterBatch(boolean toInitialize) {
+ completeABatch(toInitialize, true);
+ }
+ public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
+ completeABatch(toInitialize, needsSpill);
+ }
+ /**
+ * A current batch is full (or no more rows incoming) - complete processing this batch
+ * I.e., add it to its partition's tmp list, if needed - spill that list, and if needed -
+ * (that is, more rows are coming) - initialize with a new current batch for that partition
+ * */
+ private void completeABatch(boolean toInitialize, boolean needsSpill) {
+ if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
+ currentBatch.add(currHVVector);
+ currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ tmpBatchesList.add(currentBatch);
+ partitionBatchesCount++;
+ } else {
+ freeCurrentBatchAndHVVector();
+ }
+ if ( needsSpill ) { // spill this batch/partition and free its memory
+ spillThisPartition(tmpBatchesList, processingOuter ? "outer" : "inner");
+ }
+ if ( toInitialize ) { // allocate a new batch and HV vector
+ allocateNewCurrentBatchAndHV();
+ } else {
+ currentBatch = null;
+ currHVVector = null;
+ }
+ }
+
+ private void spillThisPartition(List<VectorContainer> vcList, String side) {
+ if ( vcList.size() == 0 ) { return; } // in case empty - nothing to spill
+ logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, vcList.size());
+
+ // If this is the first spill for this partition, create an output stream
+ if ( writer == null ) {
+ // A special case - when (outer is) empty
+ if ( vcList.get(0).getRecordCount() == 0 ) {
+ VectorContainer vc = vcList.remove(0);
+ inMemBatches.dec();
+ vc.zeroVectors();
+ return;
+ }
+ String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
+ spillFile = spillSet.getNextSpillFile(suffix);
+
+ try {
+ writer = spillSet.writer(spillFile);
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("Hash Join failed to open spill file: " + spillFile)
+ .build(logger);
+ }
+
+ isSpilled = true;
+ }
+
+ while ( vcList.size() > 0 ) {
+ VectorContainer vc = vcList.remove(0);
+ inMemBatches.dec();
+
+ int numRecords = vc.getRecordCount();
+ if (numRecords == 0) { // Spilling should to skip an empty batch
+ vc.zeroVectors();
+ continue;
+ }
+
+ // set the value count for outgoing batch value vectors
+ for (VectorWrapper<?> v : vc) {
+ v.getValueVector().getMutator().setValueCount(numRecords);
+ }
+
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
+ try {
+ writer.write(batch, null);
+ } catch (IOException ioe) {
+ throw UserException.dataWriteError(ioe)
+ .message("Hash Join failed to write to output file: " + spillFile)
+ .build(logger);
+ } finally {
+ batch.clear();
+ }
+ vc.zeroVectors();
+ logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords);
+
+ }
+ }
+
+ //
+ // ===== Methods to probe the hash table and to get indices out of the helper =======
+ //
+
+ public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
+ return hashTable.probeForKey(recordsProcessed, hashCode);
+ }
+ public int getStartIndex(int probeIndex) {
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ */
+ int compositeIndex = hjHelper.getStartIndex(probeIndex);
+ /* Record in the build side at currentCompositeIdx has a matching record in the probe
+ * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+ * join we keep track of which records we need to project at the end
+ */
+ hjHelper.setRecordMatched(compositeIndex);
+ return compositeIndex;
+ }
+ public int getNextIndex(int compositeIndex) {
+ // in case of iner rows with duplicate keys, get the next one
+ return hjHelper.getNextIndex(compositeIndex);
+ }
+ public void setRecordMatched(int compositeIndex) {
+ hjHelper.setRecordMatched(compositeIndex);
+ }
+ public List<Integer> getNextUnmatchedIndex() {
+ return hjHelper.getNextUnmatchedIndex();
+ }
+ //
+ // =====================================================================================
+ //
+
+ public int getBuildHashCode(int ind) throws SchemaChangeException {
+ return hashTable.getBuildHashCode(ind);
+ }
+ public int getProbeHashCode(int ind) throws SchemaChangeException {
+ return hashTable.getProbeHashCode(ind);
+ }
+ public ArrayList<VectorContainer> getContainers() {
+ return containers;
+ }
+
+ public void updateBatches() throws SchemaChangeException {
+ hashTable.updateBatches();
+ }
+ public boolean isSpilled() {
+ return isSpilled;
+ }
+ public String getSpillFile() {
+ return spillFile;
+ }
+
+ public int getPartitionBatchesCount() {
+ return partitionBatchesCount;
+ }
+ public int getPartitionNum() {
+ return partitionNum;
+ }
+
+ private void freeCurrentBatchAndHVVector() {
+ if ( currentBatch != null ) {
+ inMemBatches.dec();
+ currentBatch.clear();
+ currentBatch = null;
+ }
+ if ( currHVVector != null ) {
+ currHVVector.clear();
+ currHVVector = null;
+ }
+ }
+
+ public void closeWriterAndDeleteFile() {
+ closeWriterInternal(true);
+ }
+ public void closeWriter() { // no deletion !!
+ closeWriterInternal(false);
+ processingOuter = true; // After the spill file was closed
+ }
+ /**
+ * If exists - close the writer for this partition
+ *
+ * @param doDeleteFile Also delete the associated file
+ */
+ private void closeWriterInternal(boolean doDeleteFile) {
+ try {
+ if ( writer != null ) {
+ spillSet.close(writer);
+ }
+ if ( doDeleteFile && spillFile != null ) {
+ spillSet.delete(spillFile);
+ }
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("IO Error while closing %s spill file %s",
+ doDeleteFile ? "and deleting" : "",
+ spillFile)
+ .build(logger);
+ }
+ spillFile = null;
+ writer = null;
+ partitionBatchesCount = 0;
+ }
+
+ /**
+ *
+ */
+ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
+ if ( isSpilled ) { return; } // no building for spilled partitions
+ containers = new ArrayList<>();
+ for (int curr = 0; curr < partitionBatchesCount; curr++) {
+ VectorContainer nextBatch = tmpBatchesList.get(curr);
+ final int currentRecordCount = nextBatch.getRecordCount();
+
+ // For every incoming build batch, we create a matching helper batch
+ hjHelper.addNewBatch(currentRecordCount);
+
+ // Holder contains the global index where the key is hashed into using the hash table
+ final IndexPointer htIndex = new IndexPointer();
+
+ assert nextBatch != null;
+ assert probeBatch != null;
+
+ hashTable.updateIncoming(nextBatch, probeBatch );
+
+ // IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector();
+ IntVector HV_vector = (IntVector) nextBatch.getLast();
+
+ for (int recInd = 0; recInd < currentRecordCount; recInd++) {
+ int hashCode = HV_vector.getAccessor().get(recInd);
+ try {
+ hashTable.put(recInd, htIndex, hashCode);
+ } catch (RetryAfterSpillException RE) {
+ throw new OutOfMemoryException("HT put");
+ } // Hash Join can not retry yet
+ /* Use the global index returned by the hash table, to store
+ * the current record index and batch index. This will be used
+ * later when we probe and find a match.
+ */
+ hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd);
+ }
+
+ containers.add(nextBatch);
+ }
+ outerBatchNotNeeded = true; // the inner is whole in memory, no need for an outer batch
+ }
+
+ public void getStats(HashTableStats newStats) {
+ hashTable.getStats(newStats);
+ }
+
+ public void clearHashTableAndHelper() {
+ if (hashTable != null) {
+ hashTable.clear();
+ hashTable = null;
+ }
+ if (hjHelper != null) {
+ hjHelper.clear();
+ hjHelper = null;
+ }
+ }
+
+ public void close() {
+ freeCurrentBatchAndHVVector();
+ if (containers != null && !containers.isEmpty()) {
+ for (VectorContainer vc : containers) {
+ vc.clear();
+ }
+ }
+ while ( tmpBatchesList.size() > 0 ) {
+ VectorContainer vc = tmpBatchesList.remove(0);
+ inMemBatches.dec();
+ vc.clear();
+ }
+ closeWriter();
+ partitionBatchesCount = 0;
+ spillFile = null;
+ clearHashTableAndHelper();
+ if ( containers != null ) { containers.clear(); }
+ }
+
+} // class HashPartition
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index a5eb1f2..ed8f388 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -46,16 +46,18 @@ public interface HashTable {
int BATCH_SIZE = Character.MAX_VALUE + 1;
int BATCH_MASK = 0x0000FFFF;
- void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
+ void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
VectorContainer htContainerOrig);
void updateBatches() throws SchemaChangeException;
- int getHashCode(int incomingRowIdx) throws SchemaChangeException;
+ int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException;
+
+ int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException;
PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
- int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
+ int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException;
void getStats(HashTableStats stats);
@@ -65,7 +67,7 @@ public interface HashTable {
void clear();
- void reinit(RecordBatch newIncoming);
+ public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe);
void reset();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 64f8144..b7a7f7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -79,7 +79,7 @@ public abstract class HashTableTemplate implements HashTable {
private BufferAllocator allocator;
// The incoming build side record batch
- private RecordBatch incomingBuild;
+ private VectorContainer incomingBuild;
// The incoming probe side record batch (may be null)
private RecordBatch incomingProbe;
@@ -417,7 +417,7 @@ public abstract class HashTableTemplate implements HashTable {
@RuntimeOverridden
protected void setupInterior(
- @Named("incomingBuild") RecordBatch incomingBuild,
+ @Named("incomingBuild") VectorContainer incomingBuild,
@Named("incomingProbe") RecordBatch incomingProbe,
@Named("outgoing") RecordBatch outgoing,
@Named("htContainer") VectorContainer htContainer) throws SchemaChangeException {
@@ -447,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable {
@Override
- public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+ public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
@@ -572,10 +572,31 @@ public abstract class HashTableTemplate implements HashTable {
throw new RetryAfterSpillException();
}
- public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+ /**
+ * Return the Hash Value for the row in the Build incoming batch at index:
+ * (For Hash Aggregate there's no "Build" side -- only one batch - this one)
+ *
+ * @param incomingRowIdx
+ * @return
+ * @throws SchemaChangeException
+ */
+ @Override
+ public int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException {
return getHashBuild(incomingRowIdx, 0);
}
+ /**
+ * Return the Hash Value for the row in the Probe incoming batch at index:
+ *
+ * @param incomingRowIdx
+ * @return
+ * @throws SchemaChangeException
+ */
+ @Override
+ public int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException {
+ return getHashProbe(incomingRowIdx, 0);
+ }
+
/** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming
* row into the hash table. The code selects the bucket in the startIndices, then the keys are
* placed into the chained list - by storing the key values into a batch, and updating its
@@ -585,7 +606,7 @@ public abstract class HashTableTemplate implements HashTable {
*
* @param incomingRowIdx - position of the incoming row
* @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch)
- * @param hashCode - computed over the key(s) by calling getHashCode()
+ * @param hashCode - computed over the key(s) by calling getBuildHashCode()
* @return Status - the key(s) was ADDED or was already PRESENT
*/
@Override
@@ -659,17 +680,24 @@ public abstract class HashTableTemplate implements HashTable {
PutStatus.KEY_ADDED; // otherwise
}
- // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
- @Override
- public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
- int seedValue = 0;
- int hash = isProbe ? getHashProbe(incomingRowIdx, seedValue) : getHashBuild(incomingRowIdx, seedValue);
- int bucketIndex = getBucketIndex(hash, numBuckets());
+ /**
+ * Return -1 if Probe-side key is not found in the (build-side) hash table.
+ * Otherwise, return the global index of the key
+ *
+ *
+ * @param incomingRowIdx
+ * @param hashCode - The hash code for the Probe-side key
+ * @return -1 if key is not found, else return the global index of the key
+ * @throws SchemaChangeException
+ */
+ @Override
+ public int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException {
+ int bucketIndex = getBucketIndex(hashCode, numBuckets());
for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex);
currentIdxHolder.value != EMPTY_SLOT; ) {
BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
- if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
+ if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, true /* isProbe */)) {
return currentIdxHolder.value;
}
}
@@ -776,9 +804,10 @@ public abstract class HashTableTemplate implements HashTable {
batchHolders = new ArrayList<BatchHolder>();
startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
}
- public void reinit(RecordBatch newIncoming) {
+ public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
incomingBuild = newIncoming;
- reset();
+ incomingProbe = newIncomingProbe;
+ // reset();
try {
updateBatches(); // Needed to update the value vectors in the generated code with the new incoming
} catch (SchemaChangeException e) {
@@ -806,7 +835,7 @@ public abstract class HashTableTemplate implements HashTable {
public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
// These methods will be code-generated in the context of the outer class
- protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
+ protected abstract void doSetup(@Named("incomingBuild") VectorContainer incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("seedValue") int seedValue) throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index b126255..998e0b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,9 +20,12 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
+
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
@@ -31,43 +34,37 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.IndexPointer;
import org.apache.drill.exec.physical.impl.common.Comparator;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
-
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
- public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
- public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+ protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
+
+ private int RECORDS_PER_BATCH = 128; // 1024; // internal batches
+ private static final int TARGET_RECORDS_PER_BATCH = 4000;
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
@@ -77,84 +74,84 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private final List<Comparator> comparators;
- // Runtime generated class implementing HashJoinProbe interface
- private HashJoinProbe hashJoinProbe = null;
+ // Fields used for partitioning
+ private int numPartitions = 1; // must be 2 to the power of bitsInMask (set in setup())
+ private int partitionMask = 0; // numPartitions - 1
+ private int bitsInMask = 0; // number of bits in the MASK
+ private ChainedHashTable baseHashTable;
+ private boolean buildSideIsEmpty = true;
+ private boolean canSpill = true;
+ private boolean wasKilled; // a kill was received, may need to clean spilled partns
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
-
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
-
- /* Hyper container to store all build side record batches.
- * Records are retrieved from this container when there is a matching record
- * on the probe side
- */
- private ExpandableHyperContainer hyperContainer;
+ HashPartition partitions[];
// Number of records in the output container
private int outputRecords;
- // Current batch index on the build side
- private int buildBatchIndex = 0;
-
// Schema of the build side
private BatchSchema rightSchema = null;
+ private final HashTableStats htStats = new HashTableStats();
+
+ private final MajorType HVtype = MajorType.newBuilder()
+ .setMinorType(org.apache.drill.common.types.TypeProtos.MinorType.INT /* dataType */ )
+ .setMode(DataMode.REQUIRED /* mode */ )
+ .build();
+
+ private int rightHVColPosition;
+ private BufferAllocator allocator;
+ // Local fields for left/right incoming - may be replaced when reading from spilled
+ private RecordBatch buildBatch;
+ private RecordBatch probeBatch;
+
+ // For handling spilling
+ private SpillSet spillSet;
+ HashJoinPOP popConfig;
+
+ private int cycleNum = 0; // primary, secondary, tertiary, etc.
+ private int originalPartition = -1; // the partition a secondary reads from
+ IntVector read_HV_vector; // HV vector that was read from the spilled batch
+ private int MAX_BATCHES_IN_MEMORY;
+ private int MAX_BATCHES_PER_PARTITION;
+
+ public class inMemBatchCounter {
+ private int inMemBatches;
+ public void inc() { inMemBatches++; }
+ public void dec() { inMemBatches--; }
+ public int value() { return inMemBatches; }
+ }
+ public inMemBatchCounter inMemBatches = new inMemBatchCounter();
- // Generator mapping for the build side
- // Generator mapping for the build side : scalar
- private static final GeneratorMapping PROJECT_BUILD =
- GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, null /* reset */,
- null /* cleanup */);
- // Generator mapping for the build side : constant
- private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */,
- "doSetup" /* eval method */,
- null /* reset */, null /* cleanup */);
-
- // Generator mapping for the probe side : scalar
- private static final GeneratorMapping PROJECT_PROBE =
- GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval method */, null /* reset */,
- null /* cleanup */);
- // Generator mapping for the probe side : constant
- private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */,
- "doSetup" /* eval method */,
- null /* reset */, null /* cleanup */);
-
-
- // Mapping set for the build side
- private final MappingSet projectBuildMapping =
- new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */, "buildBatch" /* read container */,
- "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD);
-
- // Mapping set for the probe side
- private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
- "probeBatch" /* read container */,
- "outgoing" /* write container */,
- PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
-
- // indicates if we have previously returned an output batch
- boolean firstOutputBatch = true;
+ private static class HJSpilledPartition {
+ public int innerSpilledBatches;
+ public String innerSpillFile;
+ public int outerSpilledBatches;
+ public String outerSpillFile;
+ int cycleNum;
+ int origPartn;
+ int prevOrigPartn; }
- private final HashTableStats htStats = new HashTableStats();
+ private ArrayList<HJSpilledPartition> spilledPartitionsList;
+ private HJSpilledPartition spilledInners[]; // for the outer to find the partition
+ private int operatorId; // for the spill file name
public enum Metric implements MetricDef {
NUM_BUCKETS,
NUM_ENTRIES,
NUM_RESIZING,
- RESIZING_TIME_MS;
+ RESIZING_TIME_MS,
+ NUM_PARTITIONS,
+ SPILLED_PARTITIONS, // number of original partitions spilled to disk
+ SPILL_MB, // Number of MB of data spilled to disk. This amount is first written,
+ // then later re-read. So, disk I/O is twice this amount.
+ SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+ ;
// duplicate for hash ag
@Override
- public int metricId() {
- return ordinal();
- }
+ public int metricId() { return ordinal(); }
}
@Override
@@ -169,31 +166,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
// Initialize the hash join helper context
- hjHelper = new HashJoinHelper(context, oContext.getAllocator());
- try {
- rightSchema = right.getSchema();
- final VectorContainer vectors = new VectorContainer(oContext);
- for (final VectorWrapper<?> w : right) {
- vectors.addOrGet(w.getField());
- }
- vectors.buildSchema(SelectionVectorMode.NONE);
- vectors.setRecordCount(0);
- hyperContainer = new ExpandableHyperContainer(vectors);
- hjHelper.addNewBatch(0);
- buildBatchIndex++;
- if (isFurtherProcessingRequired(rightUpstream) && this.right.getRecordCount() > 0) {
- setupHashTable();
- }
- hashJoinProbe = setupHashJoinProbe();
- // Build the container schema and set the counts
- for (final VectorWrapper<?> w : container) {
- w.getValueVector().allocateNew();
- }
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- container.setRecordCount(outputRecords);
- } catch (IOException | ClassTransformationException e) {
- throw new SchemaChangeException(e);
+ if (rightUpstream != IterOutcome.NONE) {
+ setupHashTable();
}
+ setupOutputContainerSchema();
+ // Build the container schema and set the counts
+ for (final VectorWrapper<?> w : container) {
+ w.getValueVector().allocateNew();
+ }
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(outputRecords);
}
@Override
@@ -205,19 +187,21 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
executeBuildPhase();
- hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(), this, hashTable,
- hjHelper, joinType, leftUpstream);
// Update the hash table related stats for the operator
- updateStats(this.hashTable);
+ updateStats();
+ //
+ setupProbe();
}
// Store the number of records projected
- if ((hashTable != null && !hashTable.isEmpty()) || joinType != JoinRelType.INNER) {
+
+ if ( ! buildSideIsEmpty || // If there are build-side rows
+ joinType != JoinRelType.INNER) { // or if this is a left/full outer join
// Allocate the memory for the vectors in the output container
allocateVectors();
- outputRecords = hashJoinProbe.probeAndProject();
+ outputRecords = probeAndProject();
/* We are here because of one the following
* 1. Completed processing of all the records and we are done
@@ -235,34 +219,94 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return IterOutcome.OK;
}
+
+ // Free all partitions' in-memory data structures
+ // (In case need to start processing spilled partitions)
+ for ( HashPartition partn : partitions ) {
+ partn.close();
+ }
+
+ //
+ // (recursively) Handle the spilled partitions, if any
+ //
+ if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) {
+ // Get the next (previously) spilled partition to handle as incoming
+ HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+
+ // Create a BUILD-side "incoming" out of the inner spill file of that partition
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet);
+ // The above ctor call also got the first batch; need to update the outcome
+ rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
+
+ if ( currSp.outerSpilledBatches > 0 ) {
+ // Create a PROBE-side "incoming" out of the outer spill file of that partition
+ probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ // The above ctor call also got the first batch; need to update the outcome
+ leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
+ } else {
+ probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
+ leftUpstream = IterOutcome.NONE;
+ changeToFinalProbeState();
+ }
+
+ // update the cycle num if needed
+ // The current cycle num should always be one larger than in the spilled partition
+ if (cycleNum == currSp.cycleNum) {
+ cycleNum = 1 + currSp.cycleNum;
+ stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+ // report first spill or memory stressful situations
+ if (cycleNum == 1) { logger.info("Started reading spilled records "); }
+ if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
+ if (cycleNum == 3) { logger.warn("TERTIARY SPILLING "); }
+ if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
+ if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
+ if ( cycleNum * bitsInMask > 20 ) {
+ spilledPartitionsList.add(currSp); // so cleanup() would delete the curr spill files
+ this.cleanup();
+ throw UserException
+ .unsupportedError()
+ .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)")
+ .build(logger);
+ }
+ }
+ logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size());
+
+ state = BatchState.FIRST; // build again, initialize probe, etc
+
+ return innerNext(); // start processing the next spilled partition "recursively"
+ }
+
} else {
// Our build side is empty, we won't have any matches, clear the probe side
if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : left) {
+ for (final VectorWrapper<?> wrapper : probeBatch) {
wrapper.getValueVector().clear();
}
- left.kill(true);
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
+ probeBatch.kill(true);
+ leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : left) {
+ for (final VectorWrapper<?> wrapper : probeBatch) {
wrapper.getValueVector().clear();
}
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
+ leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
}
}
}
// No more output records, clean up and return
state = BatchState.DONE;
+
+ this.cleanup();
+
return IterOutcome.NONE;
- } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+ } catch (SchemaChangeException e) {
context.getExecutorState().fail(e);
killIncoming(false);
return IterOutcome.STOP;
}
}
- public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+ private void setupHashTable() throws SchemaChangeException {
// Setup the hash table configuration object
int conditionsSize = conditions.size();
final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
@@ -278,43 +322,103 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
leftExpr = null;
} else {
- if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- final String errorMsg = new StringBuilder()
- .append("Hash join does not support probe batch with selection vectors. ")
- .append("Probe batch has selection mode = ")
- .append(left.getSchema().getSelectionVectorMode())
- .toString();
+ if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ final String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
+ (probeBatch.getSchema().getSelectionVectorMode()).toString();
throw new SchemaChangeException(errorMsg);
}
}
+ final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
- final HashTableConfig htConfig =
- new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
// Create the chained hash table
- final ChainedHashTable ht =
- new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
- hashTable = ht.createAndSetupHashTable(null, 1);
+ baseHashTable =
+ new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null);
}
- public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
- //Setup the underlying hash table
+ /**
+ * Call only after num partitions is known
+ */
+ private void delayedSetup() {
+ //
+ // Find out the estimated max batch size, etc
+ // and compute the max numPartitions possible
+ //
+ // numPartitions = 8; // just for initial work; change later
+ // partitionMask = 7;
+ // bitsInMask = 3;
+
+ // SET FROM CONFIGURATION OPTIONS :
+ // ================================
+
+ // Set the number of partitions from the configuration (raise to a power of two, if needed)
+ numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
+ if ( numPartitions == 1 ) { //
+ canSpill = false;
+ logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
+ }
+ numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+ // Based on the number of partitions: Set the mask and bit count
+ partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+ bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+ RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
+
+ MAX_BATCHES_IN_MEMORY = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+ MAX_BATCHES_PER_PARTITION = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR);
+
+ // =================================
+
+ // Create the FIFO list of spilled partitions (pairs - inner/outer)
+ spilledPartitionsList = new ArrayList<>();
+
+ // Create array for the partitions
+ partitions = new HashPartition[numPartitions];
+
+ buildSideIsEmpty = false;
+ }
+
+ /**
+ * Initialize fields (that may be reused when reading spilled partitions)
+ */
+ private void initializeBuild() {
+ assert inMemBatches.value() == 0; // check that no in-memory batches left
+ baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
+ // Recreate the partitions every time build is initialized
+ for (int part = 0; part < numPartitions; part++ ) {
+ partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
+ RECORDS_PER_BATCH, spillSet, part, inMemBatches, cycleNum);
+ }
+
+ spilledInners = new HJSpilledPartition[numPartitions];
+
+ }
+ /**
+ * Execute the BUILD phase; first read incoming and split rows into partitions;
+ * may decide to spill some of the partitions
+ *
+ * @throws SchemaChangeException
+ */
+ public void executeBuildPhase() throws SchemaChangeException {
+ final HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = new HashJoinMemoryCalculatorImpl().next();
+ boolean hasProbeData = leftUpstream != IterOutcome.NONE;
+
+ if ( rightUpstream == IterOutcome.NONE ) { return; } // empty right
// skip first batch if count is zero, as it may be an empty schema batch
- if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) {
- for (final VectorWrapper<?> w : right) {
+ if (false && buildBatch.getRecordCount() == 0) {
+ for (final VectorWrapper<?> w : buildBatch) {
w.clear();
}
- rightUpstream = next(right);
- if (isFurtherProcessingRequired(rightUpstream) &&
- right.getRecordCount() > 0 && hashTable == null) {
- setupHashTable();
- }
+ rightUpstream = next(buildBatch);
}
- boolean moreData = true;
+ //Setup the underlying hash table
+ if ( cycleNum == 0 ) { delayedSetup(); } // first time only
+
+ initializeBuild();
+ boolean moreData = true;
while (moreData) {
switch (rightUpstream) {
case OUT_OF_MEMORY:
@@ -325,90 +429,123 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
continue;
case OK_NEW_SCHEMA:
- if (rightSchema == null) {
- rightSchema = right.getSchema();
-
- if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- final String errorMsg = new StringBuilder()
- .append("Hash join does not support build batch with selection vectors. ")
- .append("Build batch has selection mode = ")
- .append(left.getSchema().getSelectionVectorMode())
- .toString();
-
- throw new SchemaChangeException(errorMsg);
- }
- setupHashTable();
- } else {
- if (!rightSchema.equals(right.getSchema())) {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, right.getSchema());
- }
- hashTable.updateBatches();
+ if (!rightSchema.equals(buildBatch.getSchema())) {
+ throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, buildBatch.getSchema());
}
+ for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
case OK:
- final int currentRecordCount = right.getRecordCount();
+ final int currentRecordCount = buildBatch.getRecordCount();
+
+ if ( cycleNum > 0 ) {
+ read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+ }
+ // For every record in the build batch, hash the key columns and keep the result
+ for (int ind = 0; ind < currentRecordCount; ind++) {
+ int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
+ : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
+ int currPart = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+/*
+ int pos = currentBatches[currPart].appendRow(buildBatch.getContainer(),ind);
+ currHVVectors[currPart].getMutator().set(pos, hashCode); // store the hash value in the new column
+ if ( pos + 1 == RECORDS_PER_BATCH ) {
+ // The current decision on when-to-spill is crude
+ completeAnInnerBatch(currPart,true,
+ isSpilled(currPart) || // once spilled - then spill every new full batch
+ canSpill &&
+ ( inMemBatches > MAX_BATCHES_IN_MEMORY ||
+ tmpBatchesList[currPart].size() > MAX_BATCHES_PER_PARTITION ));
+ }
+*/
+
+ // Append the new inner row to the appropriate partition; spill (that partition) if needed
+ partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode,
+ // The current decision on when-to-spill is crude ...
+ partitions[currPart].isSpilled() || // once spilled - then spill every new full batch
+ canSpill &&
+ ( inMemBatches.value() > MAX_BATCHES_IN_MEMORY ||
+ partitions[currPart].getPartitionBatchesCount() > MAX_BATCHES_PER_PARTITION ) ); // may spill if needed
+ }
+
+ if ( read_HV_vector != null ) {
+ read_HV_vector.clear();
+ read_HV_vector = null;
+ }
+ break;
+ }
+ // Get the next incoming record batch
+ rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
+ }
+
+ // Move the remaining current batches into their temp lists, or spill
+ // them if the partition is spilled. Add the spilled partitions into
+ // the spilled partitions list
+ for (HashPartition partn : partitions) {
+ partn.completeAnInnerBatch(false, partn.isSpilled() );
+ if ( partn.isSpilled() ) {
+ HJSpilledPartition sp = new HJSpilledPartition();
+ sp.innerSpillFile = partn.getSpillFile();
+ sp.innerSpilledBatches = partn.getPartitionBatchesCount();
+ sp.cycleNum = cycleNum; // remember the current cycle
+ sp.origPartn = partn.getPartitionNum(); // for debugging / filename
+ sp.prevOrigPartn = originalPartition; // for debugging / filename
+ spilledPartitionsList.add(sp);
+
+ spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
+ partn.closeWriter();
+ }
+ }
+
+ //
+ // Traverse all the in-memory partitions' incoming batches, and build their hash tables
+ //
+/*
+ for (int currPart = 0; currPart < numPartitions; currPart++) {
+
+ // each partition is a regular array of batches
+ ArrayList<VectorContainer> thisPart = new ArrayList<>();
- /* For every new build batch, we store some state in the helper context
- * Add new state to the helper context
- */
- hjHelper.addNewBatch(currentRecordCount);
+ for (int curr = 0; curr < partitionBatchesCount[currPart]; curr++) {
+ VectorContainer nextBatch = tmpBatchesList[currPart].get(curr);
+ final int currentRecordCount = nextBatch.getRecordCount();
+
+ // For every incoming build batch, we create a matching helper batch
+ hjHelpers[currPart].addNewBatch(currentRecordCount);
// Holder contains the global index where the key is hashed into using the hash table
final IndexPointer htIndex = new IndexPointer();
- // For every record in the build batch , hash the key columns
- for (int i = 0; i < currentRecordCount; i++) {
- int hashCode = hashTable.getHashCode(i);
- try {
- hashTable.put(i, htIndex, hashCode);
- } catch (RetryAfterSpillException RE) { throw new OutOfMemoryException("HT put");} // Hash Join can not retry yet
- /* Use the global index returned by the hash table, to store
- * the current record index and batch index. This will be used
- * later when we probe and find a match.
- */
- hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
- }
+ hashTables[currPart].updateIncoming(nextBatch, probeBatch );
- /* Completed hashing all records in this batch. Transfer the batch
- * to the hyper vector container. Will be used when we want to retrieve
- * records that have matching keys on the probe side.
- */
- final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator());
- boolean success = false;
- try {
- if (hyperContainer == null) {
- hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
- } else {
- hyperContainer.addBatch(nextBatch.getContainer());
- }
+ IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector();
- // completed processing a batch, increment batch index
- buildBatchIndex++;
- success = true;
- } finally {
- if (!success) {
- nextBatch.clear();
- }
+ for (int recInd = 0; recInd < currentRecordCount; recInd++) {
+ int hashCode = HV_vector.getAccessor().get(recInd);
+ try {
+ hashTables[currPart].put(recInd, htIndex, hashCode);
+ } catch (RetryAfterSpillException RE) {
+ throw new OutOfMemoryException("HT put");
+ } // Hash Join can not retry yet
+ // Use the global index returned by the hash table, to store
+ //the current record index and batch index. This will be used
+ // later when we probe and find a match.
+ //
+ hjHelpers[currPart].setCurrentIndex(htIndex.value, curr , recInd);
}
- break;
+
+ thisPart.add(nextBatch);
}
- // Get the next record batch
- rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
- }
- }
- public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
- final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
- cg.plainJavaCapable(true);
- final ClassGenerator<HashJoinProbe> g = cg.getRoot();
+ partitionContainers.add(thisPart);
+*/
+ for (HashPartition partn : partitions) {
+ partn.buildContainersHashTableAndHelper();
+ }
- // Generate the code to project build side records
- g.setMappingSet(projectBuildMapping);
+ }
- int fieldId = 0;
- final JExpression buildIndex = JExpr.direct("buildIndex");
- final JExpression outIndex = JExpr.direct("outIndex");
- g.rotateBlock();
+ private void setupOutputContainerSchema() {
if (rightSchema != null) {
for (final MaterializedField field : rightSchema) {
@@ -427,27 +564,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
final MaterializedField projected = field.withType(outputType);
// Add the vector to our output container
container.addOrGet(projected);
-
- final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
- final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
- g.getEvalBlock().add(outVV.invoke("copyFromSafe")
- .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
- .arg(outIndex)
- .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
- g.rotateBlock();
- fieldId++;
}
}
- // Generate the code to project probe side records
- g.setMappingSet(projectProbeMapping);
-
- int outputFieldId = fieldId;
- fieldId = 0;
- final JExpression probeIndex = JExpr.direct("probeIndex");
-
if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
- for (final VectorWrapper<?> vv : left) {
+ for (final VectorWrapper<?> vv : probeBatch) {
final MajorType inputType = vv.getField().getType();
final MajorType outputType;
@@ -465,25 +586,22 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
vv.getValueVector().makeTransferPair(v);
v.clear();
}
-
- final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
- final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
-
- g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
- g.rotateBlock();
- fieldId++;
- outputFieldId++;
}
}
- final HashJoinProbe hj = context.getImplementationClass(cg);
- return hj;
}
private void allocateVectors() {
for (final VectorWrapper<?> v : container) {
v.getValueVector().allocateNew();
}
+ container.setRecordCount(0); // reset container's counter back to zero records
+ }
+
+ // (After the inner side was read whole) - Has that inner partition spilled
+ public boolean isSpilledInner(int part) {
+ if ( spilledInners == null ) { return false; } // empty inner
+ return spilledInners[part] != null;
}
public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
@@ -491,56 +609,379 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
RecordBatch right /*Build side record batch*/
) throws OutOfMemoryException {
super(popConfig, context, true, left, right);
+ this.buildBatch = right;
+ this.probeBatch = left;
joinType = popConfig.getJoinType();
conditions = popConfig.getConditions();
+ this.popConfig = popConfig;
comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+ // When DRILL supports Java 8, use the following instead of the for() loop
+ // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
for (int i=0; i<conditions.size(); i++) {
JoinCondition cond = conditions.get(i);
comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
}
+ this.allocator = oContext.getAllocator();
+
+ final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
+
+ if (memLimit != 0) {
+ allocator.setLimit(memLimit);
+ }
+
+ RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
+ maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+
+ spillSet = new SpillSet(context, popConfig);
+
+ // Create empty partitions (in the ctor - covers the case where right side is empty)
+ partitions = new HashPartition[0];
}
- private void updateStats(HashTable htable) {
- if (htable == null) {
- return;
+ public void cleanup() {
+ if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
+ if ( spillSet.getWriteBytes() > 0 ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+ (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+ }
+ // clean (and deallocate) each partition
+ for (HashPartition partn : partitions) {
+ partn.clearHashTableAndHelper();
+ partn.closeWriterAndDeleteFile();
+ }
+
+ // delete any spill file left in unread spilled partitions
+ while ( ! spilledPartitionsList.isEmpty() ) {
+ HJSpilledPartition sp = spilledPartitionsList.remove(0);
+ try {
+ spillSet.delete(sp.innerSpillFile);
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",sp.innerSpillFile);
+ }
+ try { // outer file is added later; may be null if cleaning prematurely
+ if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); }
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",sp.outerSpillFile);
+ }
+ }
+ // Delete the currently handled (if any) spilled files
+ spillSet.close(); // delete the spill directory(ies)
+ }
+
+ private void updateStats() {
+ if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty
+ if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+ long numSpilled = 0;
+ HashTableStats newStats = new HashTableStats();
+ // sum the stats from all the partitions
+ for ( HashPartition partn : partitions ) {
+ if ( partn.isSpilled() ) { numSpilled++; }
+ partn.getStats(newStats);
+ htStats.addStats(newStats);
}
- htable.getStats(htStats);
- stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
- stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
- stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
- stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+
+ this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+ this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+ this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+ this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+ this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+ this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no spill
+ this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
@Override
public void killIncoming(boolean sendUpstream) {
- left.kill(sendUpstream);
- right.kill(sendUpstream);
+ wasKilled = true;
+ probeBatch.kill(sendUpstream);
+ buildBatch.kill(sendUpstream);
}
@Override
public void close() {
- if (hjHelper != null) {
- hjHelper.clear();
+ for ( HashPartition partn : partitions ) {
+ partn.close();
}
+ cleanup();
+ super.close();
+ }
+
+ // ==============================================================
+ //
+ // Methods used for the probe
+ //
+ // ============================================================
+ private BatchSchema probeSchema;
+
+ enum ProbeState {
+ PROBE_PROJECT, PROJECT_RIGHT, DONE
+ }
+
+ private int currRightPartition = 0; // for returning RIGHT/FULL
+
+ // Number of records to process on the probe side
+ private int recordsToProcess = 0;
+
+ // Number of records processed on the probe side
+ private int recordsProcessed = 0;
+
+ // Indicate if we should drain the next record from the probe side
+ private boolean getNextRecord = true;
+
+ // Contains both batch idx and record idx of the matching record in the build side
+ private int currentCompositeIdx = -1;
+
+ // Current state the hash join algorithm is in
+ private ProbeState probeState = ProbeState.PROBE_PROJECT;
- // If we didn't receive any data, hyperContainer may be null, check before clearing
- if (hyperContainer != null) {
- hyperContainer.clear();
+ // For outer or right joins, this is a list of unmatched records that needs to be projected
+ private List<Integer> unmatchedBuildIndexes = null;
+
+ // While probing duplicates, retain current build-side partition in case need to continue
+ // probing later on the same chain of duplicates
+ private HashPartition currPartition;
+
+ /**
+ * Various initialization needed to perform the probe
+ * Must be called AFTER the build completes
+ */
+ private void setupProbe() {
+ currRightPartition = 0; // In case it's a Right/Full outer join
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+
+ probeSchema = probeBatch.getSchema();
+ probeState = ProbeState.PROBE_PROJECT;
+
+ // A special case - if the left was an empty file
+ if ( leftUpstream == IterOutcome.NONE ){
+ changeToFinalProbeState();
+ } else {
+ this.recordsToProcess = probeBatch.getRecordCount();
}
- if (hashTable != null) {
- hashTable.clear();
+ // for those outer partitions that need spilling (cause their matching inners spilled)
+ // initialize those partitions' current batches and hash-value vectors
+ for ( HashPartition partn : partitions ) {
+ partn.allocateNewCurrentBatchAndHV();
+ }
+
+ if ( cycleNum > 0 ) {
+ if ( read_HV_vector != null ) { read_HV_vector.clear();}
+ if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty
+ read_HV_vector = (IntVector) probeBatch.getContainer().getLast();
+ }
+ }
+ }
+
+ private void executeProjectRightPhase(int currBuildPart) {
+ while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
+ outputRecords =
+ container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
+ null /* no probeBatch */, 0 /* no probe index */ );
+ recordsProcessed++;
+ }
+ }
+
+ private void executeProbePhase() throws SchemaChangeException {
+
+ while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+
+ // Check if we have processed all records in this batch we need to invoke next
+ if (recordsProcessed == recordsToProcess) {
+
+ // Done processing all records in the previous batch, clean up!
+ for (VectorWrapper<?> wrapper : probeBatch) {
+ wrapper.getValueVector().clear();
+ }
+
+ IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
+
+ switch (leftUpstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+ changeToFinalProbeState();
+ // in case some outer partitions were spilled, need to spill their last batches
+ for ( HashPartition partn : partitions ) {
+ if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+ partn.completeAnOuterBatch(false);
+ // update the partition's spill record with the outer side
+ HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+ sp.outerSpillFile = partn.getSpillFile();
+ sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+
+ partn.closeWriter();
+ }
+
+ continue;
+
+ case OK_NEW_SCHEMA:
+ if (probeBatch.getSchema().equals(probeSchema)) {
+ for ( HashPartition partn : partitions ) { partn.updateBatches(); }
+
+ } else {
+ throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
+ probeSchema,
+ probeBatch.getSchema());
+ }
+ case OK:
+ recordsToProcess = probeBatch.getRecordCount();
+ recordsProcessed = 0;
+ // If we received an empty batch do nothing
+ if (recordsToProcess == 0) {
+ continue;
+ }
+ if ( cycleNum > 0 ) {
+ read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
+ }
+ }
+ }
+ int probeIndex = -1;
+ // Check if we need to drain the next row in the probe side
+ if (getNextRecord) {
+
+ if ( !buildSideIsEmpty ) {
+ int hashCode = ( cycleNum == 0 ) ?
+ partitions[0].getProbeHashCode(recordsProcessed)
+ : read_HV_vector.getAccessor().get(recordsProcessed);
+ int currBuildPart = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+
+ // Set and keep the current partition (may be used again on subsequent probe calls as
+ // inner rows of duplicate key are processed)
+ currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
+
+ // If the matching inner partition was spilled
+ if ( isSpilledInner(currBuildPart) ) {
+ // add this row to its outer partition (may cause a spill, when the batch is full)
+
+ currPartition.appendOuterRow(hashCode, recordsProcessed);
+
+ recordsProcessed++; // done with this outer record
+ continue; // on to the next outer record
+ }
+
+ probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
+
+ }
+
+ if (probeIndex != -1) {
+
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ * (and record this match in the bitmap, in case of a FULL/RIGHT join)
+ */
+ currentCompositeIdx = currPartition.getStartIndex(probeIndex);
+
+ outputRecords =
+ container.appendRow(currPartition.getContainers(), currentCompositeIdx,
+ probeBatch.getContainer(), recordsProcessed);
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
+ } else {
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
+ }
+ } else { // No matching key
+
+ // If we have a left outer join, project the outer side
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+
+ outputRecords =
+ container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition);
+ }
+ recordsProcessed++;
+ }
+ }
+ else { // match the next inner row with the same key
+
+ currPartition.setRecordMatched(currentCompositeIdx);
+
+ outputRecords =
+ container.appendRow(currPartition.getContainers(), currentCompositeIdx,
+ probeBatch.getContainer(), recordsProcessed);
+
+ currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ // We don't have any more rows matching the current key on the build side, move on to the next probe row
+ getNextRecord = true;
+ recordsProcessed++;
+ }
+ }
}
- super.close();
}
/**
- * This method checks to see if join processing should be continued further.
- * @param upStream up stream operator status.
- * @@return true if up stream status is OK or OK_NEW_SCHEMA otherwise false.
+ * Perform the probe and project the results
+ *
+ * @return number of output records
+ * @throws SchemaChangeException
*/
- private boolean isFurtherProcessingRequired(IterOutcome upStream) {
- return upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA;
+ private int probeAndProject() throws SchemaChangeException {
+
+ outputRecords = 0;
+
+ // When handling spilled partitions, the state becomes DONE at the end of each partition
+ if ( probeState == ProbeState.DONE ) {
+ return outputRecords; // that is zero
+ }
+
+ if (probeState == ProbeState.PROBE_PROJECT) {
+ executeProbePhase();
+ }
+
+ if (probeState == ProbeState.PROJECT_RIGHT) {
+ // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
+
+ do {
+
+ if (unmatchedBuildIndexes == null) { // first time for this partition ?
+ if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
+ // Get this partition's list of build indexes that didn't match any record on the probe side
+ unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
+ recordsProcessed = 0;
+ recordsToProcess = unmatchedBuildIndexes.size();
+ }
+
+ // Project the list of unmatched records on the build side
+ executeProjectRightPhase(currRightPartition);
+
+ if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
+ return outputRecords; // outgoing is full; report and come back later
+ } else {
+ currRightPartition++; // on to the next right partition
+ unmatchedBuildIndexes = null;
+ }
+
+ } while ( currRightPartition < numPartitions );
+
+ probeState = ProbeState.DONE; // last right partition was handled; we are done now
+ }
+
+ return outputRecords;
}
-}
+
+ private void changeToFinalProbeState() {
+ // We are done with the (left) probe phase.
+ // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
+ probeState =
+ (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
+ ProbeState.DONE; // else we're done
+ }
+
+} // public class HashJoinBatch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index e8d747e..55146f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -103,6 +103,9 @@ public class HashJoinHelper {
public BitSet getKeyMatchBitVector() {
return keyMatchBitVector;
}
+ public void clear() {
+ keyMatchBitVector.clear();
+ }
}
public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
@@ -231,5 +234,6 @@ public class HashJoinHelper {
for (BuildInfo info : buildInfoList) {
info.getLinks().clear();
}
+ buildInfoList.clear();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
deleted file mode 100644
index 36d9baa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.calcite.rel.core.JoinRelType;
-
-public interface HashJoinProbe {
- TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
-
- /* The probe side of the hash join can be in the following two states
- * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
- * key match and if we do we project the record
- * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
- * from the build side that did not match any records on the probe side. For Left outer
- * case we handle it internally by projecting the record if there isn't a match on the build side
- * 3. DONE: Once we have projected all possible records we are done
- */
- enum ProbeState {
- PROBE_PROJECT, PROJECT_RIGHT, DONE
- }
-
- void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
- JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState);
- void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
- int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- void projectBuildRecord(int buildIndex, int outIndex);
- void projectProbeRecord(int probeIndex, int outIndex);
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
deleted file mode 100644
index 1a85277..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import java.io.IOException;
-import java.util.List;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-
-public abstract class HashJoinProbeTemplate implements HashJoinProbe {
-
- // Probe side record batch
- private RecordBatch probeBatch;
-
- private BatchSchema probeSchema;
-
- private VectorContainer buildBatch;
-
- // Join type, INNER, LEFT, RIGHT or OUTER
- private JoinRelType joinType;
-
- private HashJoinBatch outgoingJoinBatch = null;
-
- private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
-
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
-
- // Number of records to process on the probe side
- private int recordsToProcess = 0;
-
- // Number of records processed on the probe side
- private int recordsProcessed = 0;
-
- // Number of records in the output container
- private int outputRecords;
-
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
-
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
-
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private List<Integer> unmatchedBuildIndexes = null;
-
- @Override
- public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable,
- HashJoinHelper hjHelper, JoinRelType joinRelType, IterOutcome leftStartState) {
-
- this.probeBatch = probeBatch;
- this.probeSchema = probeBatch.getSchema();
- this.buildBatch = buildBatch;
- this.joinType = joinRelType;
- this.recordsToProcess = probeRecordCount;
- this.hashTable = hashTable;
- this.hjHelper = hjHelper;
- this.outgoingJoinBatch = outgoing;
-
- if (leftStartState == IterOutcome.NONE) {
- if (joinRelType == JoinRelType.RIGHT) {
- probeState = ProbeState.PROJECT_RIGHT;
- } else {
- probeState = ProbeState.DONE;
- }
- }
-
- doSetup(context, buildBatch, probeBatch, outgoing);
- }
-
- public void executeProjectRightPhase() {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
- projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
- recordsProcessed++;
- outputRecords++;
- }
- }
-
- public void executeProbePhase() throws SchemaChangeException {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
-
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
-
- IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- recordsProcessed = 0;
- recordsToProcess = 0;
- probeState = ProbeState.DONE;
-
- // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
- if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
- probeState = ProbeState.PROJECT_RIGHT;
- }
-
- continue;
-
- case OK_NEW_SCHEMA:
- if (probeBatch.getSchema().equals(probeSchema)) {
- doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, outgoingJoinBatch);
- if (hashTable != null) {
- hashTable.updateBatches();
- }
- } else {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
- probeSchema,
- probeBatch.getSchema());
- }
- case OK:
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- // If we received an empty batch do nothing
- if (recordsToProcess == 0) {
- continue;
- }
- }
- }
- int probeIndex = -1;
-
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
- if (hashTable != null && !hashTable.isEmpty()) {
- probeIndex = hashTable.containsKey(recordsProcessed, true);
- }
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- */
- currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
- /* Record in the build side at currentCompositeIdx has a matching record in the probe
- * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
- * join we keep track of which records we need to project at the end
- */
- hjHelper.setRecordMatched(currentCompositeIdx);
-
- projectBuildRecord(currentCompositeIdx, outputRecords);
- projectProbeRecord(recordsProcessed, outputRecords);
- outputRecords++;
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
- */
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- } else { // No matching key
-
- // If we have a left outer join, project the keys
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- projectProbeRecord(recordsProcessed, outputRecords);
- outputRecords++;
- }
- recordsProcessed++;
- }
- } else {
- hjHelper.setRecordMatched(currentCompositeIdx);
- projectBuildRecord(currentCompositeIdx, outputRecords);
- projectProbeRecord(recordsProcessed, outputRecords);
- outputRecords++;
-
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
- }
- }
- }
-
- public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
-
- outputRecords = 0;
-
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
-
- if (probeState == ProbeState.PROJECT_RIGHT) {
-
- // We are here because we have a RIGHT OUTER or a FULL join
- if (unmatchedBuildIndexes == null) {
- // Initialize list of build indexes that didn't match a record on the probe side
- unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
- recordsToProcess = unmatchedBuildIndexes.size();
- recordsProcessed = 0;
- }
-
- // Project the list of unmatched records on the build side
- executeProjectRightPhase();
- }
-
- return outputRecords;
- }
-
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
- @Named("outgoing") RecordBatch outgoing);
- public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
-
- public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 2f9ab14..d0421f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
@@ -432,6 +433,10 @@ public class SpillSet {
operName = "HashAgg";
spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
+ } else if (popConfig instanceof HashJoinPOP) {
+ operName = "HashJoin";
+ spillFs = config.getString(ExecConstants.HASHJOIN_SPILL_FILESYSTEM);
+ dirList = config.getStringList(ExecConstants.HASHJOIN_SPILL_DIRS);
} else {
// just use the common ones
operName = "Unknown";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 9da8a4b..433e0c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -220,6 +220,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+ @Override
+ public VectorContainer getContainer() {
+ return batchLoader.getContainer();
+ }
+
private void informSenders() {
logger.info("Informing senders of request to terminate sending.");
final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 05eb545..378c980 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -359,6 +359,11 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
this.getClass().getCanonicalName()));
}
+ @Override
+ public VectorContainer getContainer() {
+ return incoming.getContainer();
+ }
+
public RecordBatch getIncoming() { return incoming; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 9d383c1..054ceec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -228,4 +228,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+
+ @Override
+ public VectorContainer getContainer() {
+ return container;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fe7f9e9..ded4351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -255,6 +255,13 @@ public interface RecordBatch extends VectorAccessible {
VectorContainer getOutgoingContainer();
/**
+ * Return the internal vector container
+ *
+ * @return The internal vector container
+ */
+ public VectorContainer getContainer();
+
+ /**
* Gets the value vector type and ID for the given schema path. The
* TypedFieldId should store a fieldId which is the same as the ordinal
* position of the field within the Iterator provided this class's
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 40447be..9dfa129 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -104,4 +104,7 @@ public class SchemalessBatch implements CloseableRecordBatch {
public void close() throws Exception {
// This is present to match BatchCreator#getBatch() returning type.
}
+
+ @Override
+ public VectorContainer getContainer() { return null; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index b6b7b21..4063e55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -94,4 +94,9 @@ public class SimpleRecordBatch implements RecordBatch {
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+
+ @Override
+ public VectorContainer getContainer() {
+ return container;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 03e9249..06a89b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.record;
import java.lang.reflect.Array;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -42,7 +43,10 @@ public class VectorContainer implements VectorAccessible {
private final BufferAllocator allocator;
protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
private BatchSchema schema;
- private int recordCount = -1;
+
+ private int recordCount = 0;
+ private boolean initialized = false;
+ // private BufferAllocator allocator;
private boolean schemaChanged = true; // Schema has changed since last built. Must rebuild schema
public VectorContainer() {
@@ -209,6 +213,109 @@ public class VectorContainer implements VectorAccessible {
}
}
+ /**
+ * This works with non-hyper {@link VectorContainer}s which have no selection vectors.
+ * Appends a row taken from a source {@link VectorContainer} to this {@link VectorContainer}.
+ * @param srcContainer The {@link VectorContainer} to copy a row from.
+ * @param srcIndex The index of the row to copy from the source {@link VectorContainer}.
+ * @return Position where the row was appended
+ */
+ public int appendRow(VectorContainer srcContainer, int srcIndex) {
+ for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, srcIndex);
+ }
+ int pos = recordCount++;
+ initialized = true;
+ return pos;
+ }
+
+ /**
+ * This method currently is only used by the Hash Join to return a row composed of build+probe rows
+ *
+ * This works with non-hyper {@link VectorContainer}s which have no selection vectors.
+ * Appends a row taken from two source {@link VectorContainer}s to this {@link VectorContainer}.
+ * @param buildSrcContainer The {@link VectorContainer} to copy the first columns of a row from.
+ * @param buildSrcIndex The index of the row to copy from the build side source {@link VectorContainer}.
+ * @param probeSrcContainer The {@link VectorContainer} to copy the last columns of a row from.
+ * @param probeSrcIndex The index of the row to copy from the probe side source {@link VectorContainer}.
+ * @return Number of records in the container after appending
+ */
+ public int appendRowXXX(VectorContainer buildSrcContainer, int buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
+ if ( buildSrcContainer != null ) {
+ for (int vectorIndex = 0; vectorIndex < buildSrcContainer.wrappers.size(); vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
+ }
+ }
+ if ( probeSrcContainer != null ) {
+ int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
+ for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
+ }
+ }
+ recordCount++;
+ initialized = true;
+ return recordCount;
+ }
+
+ private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
+ // "- 1" to skip the last "hash values" added column
+ int lastIndex = buildSrcContainer.wrappers.size() - 1 ;
+ for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
+ }
+ return lastIndex;
+ }
+ private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) {
+ // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
+ for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - baseIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
+ }
+ }
+ /**
+ * A special version of appendRow for the HashJoin; uses a composite index for the build side
+ * @param buildSrcContainers The containers list for the right side
+ * @param compositeBuildSrcIndex Composite build index
+ * @param probeSrcContainer The single container for the left/outer side
+ * @param probeSrcIndex Index in the outer container
+ * @return Number of rows in this container (after the append)
+ */
+ public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
+ int buildBatch = compositeBuildSrcIndex >>> 16;
+ int buildOffset = compositeBuildSrcIndex & 65535;
+ int baseInd = 0;
+ if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatch), buildOffset); }
+ if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); }
+ recordCount++;
+ initialized = true;
+ return recordCount;
+ }
+
+ /**
+ * A customised version of the special appendRow for HashJoin - used for Left
+ * Outer Join when there is no build side match - hence need a base index in
+ * this container's wrappers from where to start appending
+ * @param probeSrcContainer
+ * @param probeSrcIndex
+ * @param baseInd - index of this container's wrapper to start at
+ * @return
+ */
+ public int appendOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) {
+ appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
+ recordCount++;
+ initialized = true;
+ return recordCount;
+ }
+
public TypedFieldId add(ValueVector vv) {
schemaChanged = true;
schema = null;
@@ -217,6 +324,11 @@ public class VectorContainer implements VectorAccessible {
return new TypedFieldId(vv.getField().getType(), i);
}
+ public ValueVector getLast() {
+ int sz = wrappers.size();
+ if ( sz == 0 ) { return null; }
+ return wrappers.get(sz - 1).getValueVector();
+ }
public void add(ValueVector[] hyperVector) {
add(hyperVector, true);
}
@@ -343,7 +455,8 @@ public class VectorContainer implements VectorAccessible {
}
public void setRecordCount(int recordCount) {
- this.recordCount = recordCount;
+ this.recordCount = recordCount;
+ initialized = true;
}
@Override
@@ -352,7 +465,7 @@ public class VectorContainer implements VectorAccessible {
return recordCount;
}
- public boolean hasRecordCount() { return recordCount != -1; }
+ public boolean hasRecordCount() { return initialized; }
@Override
public SelectionVector2 getSelectionVector2() {
@@ -418,6 +531,7 @@ public class VectorContainer implements VectorAccessible {
merged.wrappers.addAll(wrappers);
merged.wrappers.addAll(otherContainer.wrappers);
merged.schemaChanged = false;
+ merged.initialized = true;
return merged;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 7f02773..810fd37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -117,6 +117,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(PlannerSettings.JOIN_OPTIMIZATION),
new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL),
new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing
+ new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b2013d5..f321691 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -244,6 +244,17 @@ drill.exec: {
// if they do not exist.
directories: [ "/tmp/drill/spill" ]
},
+ hashjoin: {
+ spill: {
+ // -- The 2 options below can be used to override the common ones
+ // -- (common to all spilling operators)
+ // File system to use. Local file system by default.
+ fs: ${drill.exec.spill.fs},
+ // List of directories to use. Directories are created
+ // if they do not exist.
+ directories: ${drill.exec.spill.directories},
+ }
+ },
hashagg: {
spill: {
// -- The 2 options below can be used to override the common ones
@@ -427,9 +438,16 @@ drill.exec.options: {
exec.enable_bulk_load_table_list: false,
exec.enable_union_type: false,
exec.errors.verbose: false,
+ exec.hashjoin.mem_limit: 0,
+ exec.hashjoin.num_partitions: 32,
+ exec.hashjoin.num_rows_in_batch: 1024,
+ exec.hashjoin.max_batches_in_memory: 128,
+ exec.hashjoin.max_batches_per_partition: 512,
exec.hashagg.mem_limit: 0,
exec.hashagg.min_batches_per_partition: 2,
exec.hashagg.num_partitions: 32,
+ exec.hashagg.num_rows_in_batch: 128,
+ exec.hashagg.max_batches_in_memory: 65536,
exec.hashagg.use_memory_prediction: true,
exec.impersonation.inbound_policies: "[]",
exec.java.compiler.exp_in_method_size: 50,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
new file mode 100644
index 0000000..4cfe6b4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ // Should spill, including recursive spill
+ public void testSimpleHashJoinSpill() {
+ HashJoinPOP joinConf = new HashJoinPOP(null, null,
+ Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 8);
+ // Put some duplicate values
+ List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]",
+ "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]");
+ List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]",
+ "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]");
+ int numRows = 2_500;
+ for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+ leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+ rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+ }
+
+ opTestBuilder()
+ .physicalOperator(joinConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+ .baselineColumns("lft", "a", "b", "rgt")
+ .expectedTotalRows( numRows + 9 )
+ .go();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRightOuterHashJoinSpill() {
+ HashJoinPOP joinConf = new HashJoinPOP(null, null,
+ Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.RIGHT);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 8);
+ // Put some duplicate values
+ List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]",
+ "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]");
+ List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]",
+ "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]");
+ int numRows = 8_000;
+ for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+ // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+ rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+ }
+
+ opTestBuilder()
+ .physicalOperator(joinConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+ .baselineColumns("lft", "a", "b", "rgt")
+ .expectedTotalRows( numRows + 9 )
+ .go();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testLeftOuterHashJoinSpill() {
+ HashJoinPOP joinConf = new HashJoinPOP(null, null,
+ Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.LEFT);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 8);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64);
+ operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 12);
+ // Put some duplicate values
+ List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]",
+ "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]");
+ List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]",
+ "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]");
+ int numRows = 4_000; // 100_000
+ for ( int cnt = 1; cnt <= numRows / 2 ; cnt++ ) { // inner use only half, to check the left-outer join
+ // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+ rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+ }
+ for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+ leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+ // rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+ }
+
+ opTestBuilder()
+ .physicalOperator(joinConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+ .baselineColumns("lft", "a", "b", "rgt")
+ .expectedTotalRows( numRows + 9 )
+ .go();
+ }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index f45f558..6374f1f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -151,7 +151,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
}
Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>();
- int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors);
+ int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors, null);
if (expectBatchNum != null) {
if (expectBatchNum != actualBatchNum) {
throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 4bab883..16081be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -216,6 +216,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
private boolean expectNoRows;
private Long expectedBatchSize;
private Integer expectedNumBatches;
+ private Integer expectedTotalRows;
@SuppressWarnings({"unchecked", "resource"})
public void go() {
@@ -235,7 +236,8 @@ public class PhysicalOpUnitTestBase extends ExecTest {
testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
- Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches);
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
+ if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
Map<String, List<Object>> expectedSuperVectors;
@@ -328,6 +330,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
this.expectedBatchSize = batchSize;
return this;
}
+
+ public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
+ this.expectedTotalRows = expectedTotalRows;
+ return this;
+ }
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index b5450e6..57bc79c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -315,15 +315,16 @@ public class DrillTestWrapper {
* Iterate over batches, and combine the batches into a map, where key is schema path, and value is
* the list of column values across all the batches.
* @param batches
+ * @param expectedTotalRecords
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches,
- Long expectedBatchSize, Integer expectedNumBatches)
+ Long expectedBatchSize, Integer expectedNumBatches, Integer expectedTotalRecords)
throws SchemaChangeException, UnsupportedEncodingException {
Map<String, List<Object>> combinedVectors = new TreeMap<>();
- addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors);
+ addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors, expectedTotalRecords);
return combinedVectors;
}
@@ -340,7 +341,7 @@ public class DrillTestWrapper {
*/
public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema,
Long expectedBatchSize, Integer expectedNumBatches,
- Map<String, List<Object>> combinedVectors)
+ Map<String, List<Object>> combinedVectors, Integer expectedTotalRecords)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
int numBatch = 0;
@@ -443,6 +444,9 @@ public class DrillTestWrapper {
Assert.assertTrue(numBatch <= (2*expectedNumBatches));
}
+ if ( expectedTotalRecords != null ) {
+ Assert.assertEquals(expectedTotalRecords.longValue(), totalRecords);
+ }
return numBatch;
}
@@ -562,7 +566,7 @@ public class DrillTestWrapper {
addTypeInfoIfMissing(actual.get(0), testBuilder);
BatchIterator batchIter = new BatchIterator(actual, loader);
- actualSuperVectors = addToCombinedVectorResults(batchIter, null, null);
+ actualSuperVectors = addToCombinedVectorResults(batchIter, null, null, null);
batchIter.close();
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
@@ -575,7 +579,7 @@ public class DrillTestWrapper {
test(baselineOptionSettingQueries);
expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
BatchIterator exBatchIter = new BatchIterator(expected, loader);
- expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null);
+ expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null, null);
exBatchIter.close();
}
} else {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
index 02156f6..a3cd918 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
@@ -105,4 +105,7 @@ public class RowSetBatch implements RecordBatch {
public Iterator<VectorWrapper<?>> iterator() {
return rowSet.container().iterator();
}
+
+ @Override
+ public VectorContainer getContainer() { return rowSet.container(); }
}
diff --git a/exec/java-exec/src/test/resources/empty.json b/exec/java-exec/src/test/resources/empty.json
new file mode 100644
index 0000000..e69de29
--
To stop receiving notification emails like this one, please contact
boaz@apache.org.