You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:50 UTC
[02/14] git commit: Fix alignment in Hash Join code
Fix alignment in Hash Join code
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cb0d46f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cb0d46f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cb0d46f6
Branch: refs/heads/master
Commit: cb0d46f69df2d89601f56b257f3f6c1d97eedc6e
Parents: 1f4276e
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:52:03 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:52:03 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/join/HashJoinBatch.java | 634 +++++++++----------
.../impl/join/HashJoinBatchCreator.java | 10 +-
.../exec/physical/impl/join/HashJoinHelper.java | 278 ++++----
.../exec/physical/impl/join/HashJoinProbe.java | 38 +-
.../impl/join/HashJoinProbeTemplate.java | 310 ++++-----
5 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..9afc033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -59,397 +59,397 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
- // Probe side record batch
- private final RecordBatch left;
+ // Probe side record batch
+ private final RecordBatch left;
- // Build side record batch
- private final RecordBatch right;
+ // Build side record batch
+ private final RecordBatch right;
- // Join type, INNER, LEFT, RIGHT or OUTER
- private final JoinRelType joinType;
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private final JoinRelType joinType;
- // Join conditions
- private final List<JoinCondition> conditions;
+ // Join conditions
+ private final List<JoinCondition> conditions;
- // Runtime generated class implementing HashJoinProbe interface
- private HashJoinProbe hashJoinProbe = null;
+ // Runtime generated class implementing HashJoinProbe interface
+ private HashJoinProbe hashJoinProbe = null;
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
- /* Hyper container to store all build side record batches.
- * Records are retrieved from this container when there is a matching record
- * on the probe side
- */
- private ExpandableHyperContainer hyperContainer;
+ /* Hyper container to store all build side record batches.
+ * Records are retrieved from this container when there is a matching record
+ * on the probe side
+ */
+ private ExpandableHyperContainer hyperContainer;
- // Number of records in the output container
- private int outputRecords;
+ // Number of records in the output container
+ private int outputRecords;
- // Current batch index on the build side
- private int buildBatchIndex = 0;
+ // Current batch index on the build side
+ private int buildBatchIndex = 0;
- // List of vector allocators
- private List<VectorAllocator> allocators = null;
+ // List of vector allocators
+ private List<VectorAllocator> allocators = null;
- // Schema of the build side
- private BatchSchema rightSchema = null;
+ // Schema of the build side
+ private BatchSchema rightSchema = null;
- // Generator mapping for the build side
- private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
- "projectBuildRecord" /* eval method */,
- null /* reset */, null /* cleanup */);
+ // Generator mapping for the build side
+ private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+ "projectBuildRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
- // Generator mapping for the probe side
- private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
- "projectProbeRecord" /* eval method */,
- null /* reset */, null /* cleanup */);
+ // Generator mapping for the probe side
+ private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+ "projectProbeRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
- // Mapping set for the build side
- private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
- "buildBatch" /* read container */,
- "outgoing" /* write container */,
- PROJECT_BUILD, PROJECT_BUILD);
+ // Mapping set for the build side
+ private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+ "buildBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_BUILD, PROJECT_BUILD);
- // Mapping set for the probe side
- private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
- "probeBatch" /* read container */,
- "outgoing" /* write container */,
- PROJECT_PROBE, PROJECT_PROBE);
+ // Mapping set for the probe side
+ private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+ "probeBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_PROBE, PROJECT_PROBE);
- // indicates if we have previously returned an output batch
- boolean firstOutputBatch = true;
+ // indicates if we have previously returned an output batch
+ boolean firstOutputBatch = true;
- IterOutcome leftUpstream = IterOutcome.NONE;
-
- @Override
- public int getRecordCount() {
- return outputRecords;
- }
+ IterOutcome leftUpstream = IterOutcome.NONE;
+ @Override
+ public int getRecordCount() {
+ return outputRecords;
+ }
- @Override
- public IterOutcome next() {
- try {
- /* If we are here for the first time, execute the build phase of the
- * hash join and setup the run time generated class for the probe side
- */
- if (hashJoinProbe == null) {
+ @Override
+ public IterOutcome next() {
- // Initialize the hash join helper context
- hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+ try {
+ /* If we are here for the first time, execute the build phase of the
+ * hash join and setup the run time generated class for the probe side
+ */
+ if (hashJoinProbe == null) {
- /* Build phase requires setting up the hash table. Hash table will
- * materialize both the build and probe side expressions while
- * creating the hash table. So we need to invoke next() on our probe batch
- * as well, for the materialization to be successful. This batch will not be used
- * till we complete the build phase.
- */
- leftUpstream = left.next();
+ // Initialize the hash join helper context
+ hjHelper = new HashJoinHelper(context, oContext.getAllocator());
- // Build the hash table, using the build side record batches.
- executeBuildPhase();
+ /* Build phase requires setting up the hash table. Hash table will
+ * materialize both the build and probe side expressions while
+ * creating the hash table. So we need to invoke next() on our probe batch
+ * as well, for the materialization to be successful. This batch will not be used
+ * till we complete the build phase.
+ */
+ leftUpstream = left.next();
- // Create the run time generated code needed to probe and project
- hashJoinProbe = setupHashJoinProbe();
- }
+ // Build the hash table, using the build side record batches.
+ executeBuildPhase();
- // Store the number of records projected
- if (hashTable != null) {
-
- // Allocate the memory for the vectors in the output container
- allocateVectors();
-
- outputRecords = hashJoinProbe.probeAndProject();
-
- /* We are here because of one the following
- * 1. Completed processing of all the records and we are done
- * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
- * Either case build the output container's schema and return
- */
- if (outputRecords > 0) {
-
- // Build the container schema and set the counts
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- container.setRecordCount(outputRecords);
-
- for (VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputRecords);
- }
-
- // First output batch, return OK_NEW_SCHEMA
- if (firstOutputBatch == true) {
- firstOutputBatch = false;
- return IterOutcome.OK_NEW_SCHEMA;
- }
-
- // Not the first output batch
- return IterOutcome.OK;
- }
- } else {
- // Our build side is empty, we won't have any matches, clear the probe side
- if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = left.next();
- while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = left.next();
- }
- }
- }
+ // Create the run time generated code needed to probe and project
+ hashJoinProbe = setupHashJoinProbe();
+ }
- // No more output records, clean up and return
- return IterOutcome.NONE;
+ // Store the number of records projected
+ if (hashTable != null) {
- } catch (ClassTransformationException | SchemaChangeException | IOException e) {
- context.fail(e);
- killIncoming();
- return IterOutcome.STOP;
- }
- }
+ // Allocate the memory for the vectors in the output container
+ allocateVectors();
- public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+ outputRecords = hashJoinProbe.probeAndProject();
- // Setup the hash table configuration object
- int conditionsSize = conditions.size();
+ /* We are here because of one the following
+ * 1. Completed processing of all the records and we are done
+ * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+ * Either case build the output container's schema and return
+ */
+ if (outputRecords > 0) {
- NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
- NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+ // Build the container schema and set the counts
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(outputRecords);
- // Create named expressions from the conditions
- for (int i = 0; i < conditionsSize; i++) {
- rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
- leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+ for (VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputRecords);
+ }
- // Hash join only supports equality currently.
- assert conditions.get(i).getRelationship().equals("==");
+ // First output batch, return OK_NEW_SCHEMA
+ if (firstOutputBatch == true) {
+ firstOutputBatch = false;
+ return IterOutcome.OK_NEW_SCHEMA;
}
- // Set the left named expression to be null if the probe batch is empty.
- if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
- leftExpr = null;
- } else {
- if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+ // Not the first output batch
+ return IterOutcome.OK;
+ }
+ } else {
+ // Our build side is empty, we won't have any matches, clear the probe side
+ if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+ for (VectorWrapper<?> wrapper : left) {
+ wrapper.getValueVector().clear();
+ }
+ leftUpstream = left.next();
+ while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+ for (VectorWrapper<?> wrapper : left) {
+ wrapper.getValueVector().clear();
+ }
+ leftUpstream = left.next();
}
}
+ }
- HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+ // No more output records, clean up and return
+ return IterOutcome.NONE;
- // Create the chained hash table
- ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
- hashTable = ht.createAndSetupHashTable(null);
+ } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+ context.fail(e);
+ killIncoming();
+ return IterOutcome.STOP;
}
+ }
- public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
- //Setup the underlying hash table
- IterOutcome rightUpstream = right.next();
-
- boolean moreData = true;
-
- while (moreData) {
-
- switch (rightUpstream) {
-
- case NONE:
- case NOT_YET:
- case STOP:
- moreData = false;
- continue;
-
- case OK_NEW_SCHEMA:
- if (rightSchema == null) {
- rightSchema = right.getSchema();
-
- if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
- }
- setupHashTable();
- } else {
- throw new SchemaChangeException("Hash join does not support schema changes");
- }
- // Fall through
- case OK:
- int currentRecordCount = right.getRecordCount();
-
- /* For every new build batch, we store some state in the helper context
- * Add new state to the helper context
- */
- hjHelper.addNewBatch(currentRecordCount);
-
- // Holder contains the global index where the key is hashed into using the hash table
- IntHolder htIndex = new IntHolder();
-
- // For every record in the build batch , hash the key columns
- for (int i = 0; i < currentRecordCount; i++) {
-
- HashTable.PutStatus status = hashTable.put(i, htIndex);
-
- if (status != HashTable.PutStatus.PUT_FAILED) {
- /* Use the global index returned by the hash table, to store
- * the current record index and batch index. This will be used
- * later when we probe and find a match.
- */
- hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
- }
- }
-
- /* Completed hashing all records in this batch. Transfer the batch
- * to the hyper vector container. Will be used when we want to retrieve
- * records that have matching keys on the probe side.
- */
- RecordBatchData nextBatch = new RecordBatchData(right);
- if (hyperContainer == null) {
- hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
- } else {
- hyperContainer.addBatch(nextBatch.getContainer());
- }
-
- // completed processing a batch, increment batch index
- buildBatchIndex++;
- break;
- }
- // Get the next record batch
- rightUpstream = right.next();
- }
+ public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+ // Setup the hash table configuration object
+ int conditionsSize = conditions.size();
+
+ NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+ NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+ // Create named expressions from the conditions
+ for (int i = 0; i < conditionsSize; i++) {
+ rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+ leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+ // Hash join only supports equality currently.
+ assert conditions.get(i).getRelationship().equals("==");
}
- public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+ // Set the left named expression to be null if the probe batch is empty.
+ if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
+ leftExpr = null;
+ } else {
+ if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+ }
+ }
- allocators = new ArrayList<>();
+ HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
- final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<HashJoinProbe> g = cg.getRoot();
+ // Create the chained hash table
+ ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
+ hashTable = ht.createAndSetupHashTable(null);
+ }
- // Generate the code to project build side records
- g.setMappingSet(projectBuildMapping);
+ public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
+ //Setup the underlying hash table
+ IterOutcome rightUpstream = right.next();
- int fieldId = 0;
- JExpression buildIndex = JExpr.direct("buildIndex");
- JExpression outIndex = JExpr.direct("outIndex");
- g.rotateBlock();
+ boolean moreData = true;
- if (hyperContainer != null) {
- for(VectorWrapper<?> vv : hyperContainer) {
+ while (moreData) {
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
- if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
+ switch (rightUpstream) {
- // Add the vector to our output container
- ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
- container.add(v);
- allocators.add(RemovingRecordBatch.getAllocator4(v));
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ moreData = false;
+ continue;
- JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
- .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
- .arg(outIndex)
- .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+ case OK_NEW_SCHEMA:
+ if (rightSchema == null) {
+ rightSchema = right.getSchema();
- fieldId++;
+ if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
}
- }
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
+ setupHashTable();
+ } else {
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ }
+ // Fall through
+ case OK:
+ int currentRecordCount = right.getRecordCount();
- // Generate the code to project probe side records
- g.setMappingSet(projectProbeMapping);
+ /* For every new build batch, we store some state in the helper context
+ * Add new state to the helper context
+ */
+ hjHelper.addNewBatch(currentRecordCount);
- int outputFieldId = fieldId;
- fieldId = 0;
- JExpression probeIndex = JExpr.direct("probeIndex");
- int recordCount = 0;
+ // Holder contains the global index where the key is hashed into using the hash table
+ IntHolder htIndex = new IntHolder();
- if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
- for (VectorWrapper<?> vv : left) {
+ // For every record in the build batch , hash the key columns
+ for (int i = 0; i < currentRecordCount; i++) {
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
- if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
+ HashTable.PutStatus status = hashTable.put(i, htIndex);
- ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
- container.add(v);
- allocators.add(RemovingRecordBatch.getAllocator4(v));
+ if (status != HashTable.PutStatus.PUT_FAILED) {
+ /* Use the global index returned by the hash table, to store
+ * the current record index and batch index. This will be used
+ * later when we probe and find a match.
+ */
+ hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+ }
+ }
- JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+ /* Completed hashing all records in this batch. Transfer the batch
+ * to the hyper vector container. Will be used when we want to retrieve
+ * records that have matching keys on the probe side.
+ */
+ RecordBatchData nextBatch = new RecordBatchData(right);
+ if (hyperContainer == null) {
+ hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+ } else {
+ hyperContainer.addBatch(nextBatch.getContainer());
+ }
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+ // completed processing a batch, increment batch index
+ buildBatchIndex++;
+ break;
+ }
+ // Get the next record batch
+ rightUpstream = right.next();
+ }
+ }
- fieldId++;
- outputFieldId++;
- }
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
+ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+ allocators = new ArrayList<>();
+
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+ // Generate the code to project build side records
+ g.setMappingSet(projectBuildMapping);
+
+
+ int fieldId = 0;
+ JExpression buildIndex = JExpr.direct("buildIndex");
+ JExpression outIndex = JExpr.direct("outIndex");
+ g.rotateBlock();
- recordCount = left.getRecordCount();
+ if (hyperContainer != null) {
+ for(VectorWrapper<?> vv : hyperContainer) {
+
+ MajorType inputType = vv.getField().getType();
+ MajorType outputType;
+ if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
}
+ // Add the vector to our output container
+ ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
- HashJoinProbe hj = context.getImplementationClass(cg);
+ JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
+ .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
- hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
- return hj;
+ fieldId++;
+ }
}
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
+
+ // Generate the code to project probe side records
+ g.setMappingSet(projectProbeMapping);
- private void allocateVectors(){
- for(VectorAllocator a : allocators){
- a.alloc(RecordBatch.MAX_BATCH_SIZE);
+ int outputFieldId = fieldId;
+ fieldId = 0;
+ JExpression probeIndex = JExpr.direct("probeIndex");
+ int recordCount = 0;
+
+ if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ for (VectorWrapper<?> vv : left) {
+
+ MajorType inputType = vv.getField().getType();
+ MajorType outputType;
+ if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
}
- }
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
- super(popConfig, context);
- this.left = left;
- this.right = right;
- this.joinType = popConfig.getJoinType();
- this.conditions = popConfig.getConditions();
- }
+ ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+ JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
- @Override
- public void killIncoming() {
- this.left.kill();
- this.right.kill();
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+
+ fieldId++;
+ outputFieldId++;
+ }
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
+
+ recordCount = left.getRecordCount();
}
- @Override
- public void cleanup() {
- hjHelper.clear();
- // If we didn't receive any data, hyperContainer may be null, check before clearing
- if (hyperContainer != null) {
- hyperContainer.clear();
- }
+ HashJoinProbe hj = context.getImplementationClass(cg);
- if (hashTable != null) {
- hashTable.clear();
- }
- super.cleanup();
- left.cleanup();
- right.cleanup();
+ hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
+ return hj;
+ }
+
+ private void allocateVectors(){
+ for(VectorAllocator a : allocators){
+ a.alloc(RecordBatch.MAX_BATCH_SIZE);
+ }
+ }
+
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context);
+ this.left = left;
+ this.right = right;
+ this.joinType = popConfig.getJoinType();
+ this.conditions = popConfig.getConditions();
+ }
+
+ @Override
+ public void killIncoming() {
+ this.left.kill();
+ this.right.kill();
+ }
+
+ @Override
+ public void cleanup() {
+ hjHelper.clear();
+
+ // If we didn't receive any data, hyperContainer may be null, check before clearing
+ if (hyperContainer != null) {
+ hyperContainer.clear();
+ }
+
+ if (hashTable != null) {
+ hashTable.clear();
}
+ super.cleanup();
+ left.cleanup();
+ right.cleanup();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 19a4a29..d925958 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -29,9 +29,9 @@ import java.util.List;
public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
- @Override
- public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.size() == 2);
- return new HashJoinBatch(config, context, children.get(0), children.get(1));
- }
+ @Override
+ public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 2);
+ return new HashJoinBatch(config, context, children.get(0), children.get(1));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index b1ed07e..a634827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -51,183 +51,183 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
*/
public class HashJoinHelper {
- /* List of start indexes. Stores the record and batch index of the first record
- * with a give key.
- */
- List<SelectionVector4> startIndices = new ArrayList<>();
-
- // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
- List<BuildInfo> buildInfoList = new ArrayList<>();
+ /* List of start indexes. Stores the record and batch index of the first record
+ * with a give key.
+ */
+ List<SelectionVector4> startIndices = new ArrayList<>();
- // Fragment context
- FragmentContext context;
- BufferAllocator allocator;
+ // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+ List<BuildInfo> buildInfoList = new ArrayList<>();
- // Constant to indicate index is empty.
- static final int INDEX_EMPTY = -1;
+ // Fragment context
+ FragmentContext context;
+ BufferAllocator allocator;
- // bits to shift while obtaining batch index from SV4
- static final int SHIFT_SIZE = 16;
+ // Constant to indicate index is empty.
+ static final int INDEX_EMPTY = -1;
- public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
- this.context = context;
- this.allocator = allocator;
- }
+ // bits to shift while obtaining batch index from SV4
+ static final int SHIFT_SIZE = 16;
- public void addStartIndexBatch() throws SchemaChangeException {
- startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
- }
+ public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
+ this.context = context;
+ this.allocator = allocator;
+}
- public class BuildInfo {
- // List of links. Logically it helps maintain a linked list of records with the same key value
- private SelectionVector4 links;
+ public void addStartIndexBatch() throws SchemaChangeException {
+ startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+ }
- // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
- private BitSet keyMatchBitVector;
+ public class BuildInfo {
+ // List of links. Logically it helps maintain a linked list of records with the same key value
+ private SelectionVector4 links;
- // number of records in this batch
- private int recordCount;
+ // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+ private BitSet keyMatchBitVector;
- public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
- this.links = links;
- this.keyMatchBitVector = keyMatchBitVector;
- this.recordCount = recordCount;
- }
+ // number of records in this batch
+ private int recordCount;
- public SelectionVector4 getLinks() {
- return links;
- }
+ public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+ this.links = links;
+ this.keyMatchBitVector = keyMatchBitVector;
+ this.recordCount = recordCount;
+ }
- public BitSet getKeyMatchBitVector() {
- return keyMatchBitVector;
- }
+ public SelectionVector4 getLinks() {
+ return links;
}
- public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+ public BitSet getKeyMatchBitVector() {
+ return keyMatchBitVector;
+ }
+ }
- ByteBuf vector = allocator.buffer((recordCount * 4));
+ public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
- SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+ ByteBuf vector = allocator.buffer((recordCount * 4));
- // Initialize the vector
- for (int i = 0; i < recordCount; i++) {
- sv4.set(i, INDEX_EMPTY);
- }
+ SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
- return sv4;
+ // Initialize the vector
+ for (int i = 0; i < recordCount; i++) {
+ sv4.set(i, INDEX_EMPTY);
}
- public void addNewBatch(int recordCount) throws SchemaChangeException {
- // Add a node to the list of BuildInfo's
- BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
- buildInfoList.add(info);
- }
+ return sv4;
+ }
- public int getStartIndex(int keyIndex) {
- int batchIdx = keyIndex / HashTable.BATCH_SIZE;
- int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+ public void addNewBatch(int recordCount) throws SchemaChangeException {
+ // Add a node to the list of BuildInfo's
+ BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+ buildInfoList.add(info);
+ }
- assert batchIdx < startIndices.size();
+ public int getStartIndex(int keyIndex) {
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
- SelectionVector4 sv4 = startIndices.get(batchIdx);
+ assert batchIdx < startIndices.size();
- return sv4.get(offsetIdx);
- }
+ SelectionVector4 sv4 = startIndices.get(batchIdx);
- public int getNextIndex(int currentIdx) {
- // Get to the links field of the current index to get the next index
- int batchIdx = currentIdx >>> SHIFT_SIZE;
- int recordIdx = currentIdx & HashTable.BATCH_MASK;
+ return sv4.get(offsetIdx);
+ }
- assert batchIdx < buildInfoList.size();
+ public int getNextIndex(int currentIdx) {
+ // Get to the links field of the current index to get the next index
+ int batchIdx = currentIdx >>> SHIFT_SIZE;
+ int recordIdx = currentIdx & HashTable.BATCH_MASK;
- // Get the corresponding BuildInfo node
- BuildInfo info = buildInfoList.get(batchIdx);
- return info.getLinks().get(recordIdx);
- }
+ assert batchIdx < buildInfoList.size();
- public List<Integer> getNextUnmatchedIndex() {
- List<Integer> compositeIndexes = new ArrayList<>();
+ // Get the corresponding BuildInfo node
+ BuildInfo info = buildInfoList.get(batchIdx);
+ return info.getLinks().get(recordIdx);
+ }
- for (int i = 0; i < buildInfoList.size(); i++) {
- BuildInfo info = buildInfoList.get(i);
- int fromIndex = 0;
+ public List<Integer> getNextUnmatchedIndex() {
+ List<Integer> compositeIndexes = new ArrayList<>();
- while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
- compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
- fromIndex++;
- }
- }
- return compositeIndexes;
+ for (int i = 0; i < buildInfoList.size(); i++) {
+ BuildInfo info = buildInfoList.get(i);
+ int fromIndex = 0;
+
+ while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+ compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+ fromIndex++;
+ }
}
+ return compositeIndexes;
+ }
- public void setRecordMatched(int index) {
- int batchIdx = index >>> SHIFT_SIZE;
- int recordIdx = index & HashTable.BATCH_MASK;
+ public void setRecordMatched(int index) {
+ int batchIdx = index >>> SHIFT_SIZE;
+ int recordIdx = index & HashTable.BATCH_MASK;
- // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
- BuildInfo info = buildInfoList.get(batchIdx);
- BitSet bitVector = info.getKeyMatchBitVector();
+ // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+ BuildInfo info = buildInfoList.get(batchIdx);
+ BitSet bitVector = info.getKeyMatchBitVector();
- bitVector.set(recordIdx);
- }
+ bitVector.set(recordIdx);
+ }
- public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+ public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
- /* set the current record batch index and the index
- * within the batch at the specified keyIndex. The keyIndex
- * denotes the global index where the key for this record is
- * stored in the hash table
+ /* set the current record batch index and the index
+ * within the batch at the specified keyIndex. The keyIndex
+ * denotes the global index where the key for this record is
+ * stored in the hash table
+ */
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+ if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+ // allocate a new batch
+ addStartIndexBatch();
+ }
+
+ SelectionVector4 startIndex = startIndices.get(batchIdx);
+ int linkIndex;
+
+ // If head of the list is empty, insert current index at this position
+ if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+ startIndex.set(offsetIdx, batchIndex, recordIndex);
+ } else {
+ /* Head of this list is not empty, if the first link
+ * is empty insert there
+ */
+ batchIdx = linkIndex >>> SHIFT_SIZE;
+ offsetIdx = linkIndex & Character.MAX_VALUE;
+
+ SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
+ int firstLink = link.get(offsetIdx);
+
+ if (firstLink == INDEX_EMPTY) {
+ link.set(offsetIdx, batchIndex, recordIndex);
+ } else {
+ /* Insert the current value as the first link and
+ * make the current first link as its next
*/
- int batchIdx = keyIndex / HashTable.BATCH_SIZE;
- int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
-
- if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
- // allocate a new batch
- addStartIndexBatch();
- }
-
- SelectionVector4 startIndex = startIndices.get(batchIdx);
- int linkIndex;
-
- // If head of the list is empty, insert current index at this position
- if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
- startIndex.set(offsetIdx, batchIndex, recordIndex);
- } else {
- /* Head of this list is not empty, if the first link
- * is empty insert there
- */
- batchIdx = linkIndex >>> SHIFT_SIZE;
- offsetIdx = linkIndex & Character.MAX_VALUE;
-
- SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
- int firstLink = link.get(offsetIdx);
-
- if (firstLink == INDEX_EMPTY) {
- link.set(offsetIdx, batchIndex, recordIndex);
- } else {
- /* Insert the current value as the first link and
- * make the current first link as its next
- */
- int firstLinkBatchIdx = firstLink >>> SHIFT_SIZE;
- int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
-
- SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
- nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
-
- link.set(offsetIdx, batchIndex, recordIndex);
- }
- }
+ int firstLinkBatchIdx = firstLink >>> SHIFT_SIZE;
+ int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
+
+ SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
+ nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
+
+ link.set(offsetIdx, batchIndex, recordIndex);
+ }
}
+ }
- public void clear() {
- // Clear the SV4 used for start indices
- for (SelectionVector4 sv4: startIndices) {
- sv4.clear();
- }
+ public void clear() {
+ // Clear the SV4 used for start indices
+ for (SelectionVector4 sv4: startIndices) {
+ sv4.clear();
+ }
- for (BuildInfo info : buildInfoList) {
- info.getLinks().clear();
- }
+ for (BuildInfo info : buildInfoList) {
+ info.getLinks().clear();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 160d352..6d20f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -33,25 +33,25 @@ import org.eigenbase.rel.JoinRelType;
import java.io.IOException;
public interface HashJoinProbe {
- public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+ public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
- /* The probe side of the hash join can be in the following two states
- * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
- * key match and if we do we project the record
- * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
- * from the build side that did not match any records on the probe side. For Left outer
- * case we handle it internally by projecting the record if there isn't a match on the build side
- * 3. DONE: Once we have projected all possible records we are done
- */
- public static enum ProbeState {
- PROBE_PROJECT, PROJECT_RIGHT, DONE
- }
+ /* The probe side of the hash join can be in the following two states
+ * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+ * key match and if we do we project the record
+ * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+ * from the build side that did not match any records on the probe side. For Left outer
+ * case we handle it internally by projecting the record if there isn't a match on the build side
+ * 3. DONE: Once we have projected all possible records we are done
+ */
+ public static enum ProbeState {
+ PROBE_PROJECT, PROJECT_RIGHT, DONE
+ }
- public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
- JoinRelType joinRelType);
- public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
- public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
- public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+ public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+ JoinRelType joinRelType);
+ public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+ public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+ public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
+ public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index a3e3b74..2a8be54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,196 +35,196 @@ import java.util.List;
public abstract class HashJoinProbeTemplate implements HashJoinProbe {
- // Probe side record batch
- private RecordBatch probeBatch;
+ // Probe side record batch
+ private RecordBatch probeBatch;
- // Join type, INNER, LEFT, RIGHT or OUTER
- private JoinRelType joinType;
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private JoinRelType joinType;
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
- // Number of records to process on the probe side
- private int recordsToProcess = 0;
+ // Number of records to process on the probe side
+ private int recordsToProcess = 0;
- // Number of records processed on the probe side
- private int recordsProcessed = 0;
+ // Number of records processed on the probe side
+ private int recordsProcessed = 0;
- // Number of records in the output container
- private int outputRecords;
+ // Number of records in the output container
+ private int outputRecords;
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
+ // Indicate if we should drain the next record from the probe side
+ private boolean getNextRecord = true;
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
+ // Contains both batch idx and record idx of the matching record in the build side
+ private int currentCompositeIdx = -1;
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
+ // Current state the hash join algorithm is in
+ private ProbeState probeState = ProbeState.PROBE_PROJECT;
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private List<Integer> unmatchedBuildIndexes = null;
+ // For outer or right joins, this is a list of unmatched records that needs to be projected
+ private List<Integer> unmatchedBuildIndexes = null;
- @Override
- public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
- HashJoinHelper hjHelper, JoinRelType joinRelType) {
+ @Override
+ public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
+ HashJoinHelper hjHelper, JoinRelType joinRelType) {
- this.probeBatch = probeBatch;
- this.joinType = joinRelType;
- this.recordsToProcess = probeRecordCount;
- this.hashTable = hashTable;
- this.hjHelper = hjHelper;
+ this.probeBatch = probeBatch;
+ this.joinType = joinRelType;
+ this.recordsToProcess = probeRecordCount;
+ this.hashTable = hashTable;
+ this.hjHelper = hjHelper;
- doSetup(context, buildBatch, probeBatch, outgoing);
+ doSetup(context, buildBatch, probeBatch, outgoing);
+ }
+
+ public void executeProjectRightPhase() {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+ boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+ assert success;
}
+ }
- public void executeProjectRightPhase() {
- while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
- boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
- assert success;
+ public void executeProbePhase() throws SchemaChangeException {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+ // Check if we have processed all records in this batch we need to invoke next
+ if (recordsProcessed == recordsToProcess) {
+
+ // Done processing all records in the previous batch, clean up!
+ for (VectorWrapper<?> wrapper : probeBatch) {
+ wrapper.getValueVector().clear();
}
- }
- public void executeProbePhase() throws SchemaChangeException {
- while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+ IterOutcome leftUpstream = probeBatch.next();
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
+ switch (leftUpstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+ probeState = ProbeState.DONE;
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
+ // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+ if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+ probeState = ProbeState.PROJECT_RIGHT;
+ }
- IterOutcome leftUpstream = probeBatch.next();
+ continue;
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- recordsProcessed = 0;
- recordsToProcess = 0;
- probeState = ProbeState.DONE;
+ case OK_NEW_SCHEMA:
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ case OK:
+ recordsToProcess = probeBatch.getRecordCount();
+ recordsProcessed = 0;
+ }
+ }
+ int probeIndex;
- // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
- if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
- probeState = ProbeState.PROJECT_RIGHT;
- }
+ // Check if we need to drain the next row in the probe side
+ if (getNextRecord) {
+ probeIndex = hashTable.containsKey(recordsProcessed, true);
- continue;
+ if (probeIndex != -1) {
- case OK_NEW_SCHEMA:
- throw new SchemaChangeException("Hash join does not support schema changes");
- case OK:
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- }
- }
- int probeIndex;
-
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
- probeIndex = hashTable.containsKey(recordsProcessed, true);
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- */
- currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
- /* Record in the build side at currentCompositeIdx has a matching record in the probe
- * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
- * join we keep track of which records we need to project at the end
- */
- hjHelper.setRecordMatched(currentCompositeIdx);
-
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
- assert success;
- success = projectProbeRecord(recordsProcessed, outputRecords);
- assert success;
- outputRecords++;
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
- */
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- }
- else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- }
- else { // No matching key
-
- // If we have a left outer join, project the keys
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- projectProbeRecord(recordsProcessed, outputRecords++);
- }
- recordsProcessed++;
- }
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ */
+ currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+ /* Record in the build side at currentCompositeIdx has a matching record in the probe
+ * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+ * join we keep track of which records we need to project at the end
+ */
+ hjHelper.setRecordMatched(currentCompositeIdx);
+
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
+ success = projectProbeRecord(recordsProcessed, outputRecords);
+ assert success;
+ outputRecords++;
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
}
else {
- hjHelper.setRecordMatched(currentCompositeIdx);
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
- assert success;
- projectProbeRecord(recordsProcessed, outputRecords);
- outputRecords++;
-
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
}
}
- }
+ else { // No matching key
- public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+ // If we have a left outer join, project the keys
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+ projectProbeRecord(recordsProcessed, outputRecords++);
+ }
+ recordsProcessed++;
+ }
+ }
+ else {
+ hjHelper.setRecordMatched(currentCompositeIdx);
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
+ projectProbeRecord(recordsProcessed, outputRecords);
+ outputRecords++;
+
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ // We don't have any more rows matching the current key on the build side, move on to the next probe row
+ getNextRecord = true;
+ recordsProcessed++;
+ }
+ }
+ }
+ }
- outputRecords = 0;
+ public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
+ outputRecords = 0;
- if (probeState == ProbeState.PROJECT_RIGHT) {
+ if (probeState == ProbeState.PROBE_PROJECT) {
+ executeProbePhase();
+ }
- // We are here because we have a RIGHT OUTER or a FULL join
- if (unmatchedBuildIndexes == null) {
- // Initialize list of build indexes that didn't match a record on the probe side
- unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
- recordsToProcess = unmatchedBuildIndexes.size();
- recordsProcessed = 0;
- }
+ if (probeState == ProbeState.PROJECT_RIGHT) {
- // Project the list of unmatched records on the build side
- executeProjectRightPhase();
- }
+ // We are here because we have a RIGHT OUTER or a FULL join
+ if (unmatchedBuildIndexes == null) {
+ // Initialize list of build indexes that didn't match a record on the probe side
+ unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+ recordsToProcess = unmatchedBuildIndexes.size();
+ recordsProcessed = 0;
+ }
- return outputRecords;
+ // Project the list of unmatched records on the build side
+ executeProjectRightPhase();
}
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
- @Named("outgoing") RecordBatch outgoing);
- public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
- public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+ return outputRecords;
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+ @Named("outgoing") RecordBatch outgoing);
+ public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+ public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
}