You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 06:08:25 UTC
[17/17] git commit: DRILL-505: Hash Join
DRILL-505: Hash Join
Support for left outer, right outer and full joins
Support for multiple join conditions
Add following tests
- Multiple condition join
- Join on JSON scan
- Multi batch join
- Simple equality join
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1fc7b982
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1fc7b982
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1fc7b982
Branch: refs/heads/master
Commit: 1fc7b982414bc0dcd29b1d31e312d2207971933a
Parents: c7cb7ba
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Feb 25 01:09:06 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 19 21:07:29 2014 -0700
----------------------------------------------------------------------
.../physical/base/AbstractPhysicalVisitor.java | 5 +
.../exec/physical/base/PhysicalVisitor.java | 1 +
.../drill/exec/physical/config/HashJoinPOP.java | 143 ++++++++
.../physical/impl/common/ChainedHashTable.java | 40 ++-
.../exec/physical/impl/common/HashTable.java | 2 +-
.../physical/impl/common/HashTableTemplate.java | 11 +-
.../exec/physical/impl/join/HashJoinBatch.java | 355 +++++++++++++++++++
.../impl/join/HashJoinBatchCreator.java | 37 ++
.../exec/physical/impl/join/HashJoinHelper.java | 222 ++++++++++++
.../exec/physical/impl/join/HashJoinProbe.java | 56 +++
.../impl/join/HashJoinProbeTemplate.java | 226 ++++++++++++
.../exec/physical/impl/join/TestHashJoin.java | 216 +++++++++++
.../apache/drill/exec/pop/PopUnitTestBase.java | 2 +-
.../src/test/resources/build_side_input.json | 24 ++
.../src/test/resources/join/hash_join.json | 63 ++++
.../resources/join/hash_join_multi_batch.json | 47 +++
.../join/hj_left_outer_multi_batch.json | 48 +++
.../resources/join/hj_multi_condition_join.json | 66 ++++
.../join/hj_right_outer_multi_batch.json | 48 +++
.../src/test/resources/probe_side_input.json | 28 ++
20 files changed, 1618 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 5f37487..abafc42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -106,6 +106,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
+ public T visitHashJoin(HashJoinPOP join, X value) throws E {
+ return visitOp(join, value);
+ }
+
+ @Override
public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
return visitSender(op, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 712fafe..208dab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -41,6 +41,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
+ public RETURN visitHashJoin(HashJoinPOP join, EXTRA value) throws EXCEP;
public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
new file mode 100644
index 0000000..f4a1fc7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+
+import org.eigenbase.rel.JoinRelType;
+
+@JsonTypeName("hash-join")
+public class HashJoinPOP extends AbstractBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
+
+
+ private final PhysicalOperator left;
+ private final PhysicalOperator right;
+ private final List<JoinCondition> conditions;
+ private final JoinRelType joinType;
+ private final HashTableConfig htConfig;
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(0,0,0,0);
+ }
+
+ @JsonCreator
+ public HashJoinPOP(
+ @JsonProperty("left") PhysicalOperator left,
+ @JsonProperty("right") PhysicalOperator right,
+ @JsonProperty("join-conditions") List<JoinCondition> conditions,
+ @JsonProperty("join-type") JoinRelType joinType
+ ) {
+ this.left = left;
+ this.right = right;
+ this.conditions = conditions;
+ this.joinType = joinType;
+
+ int conditionsSize = conditions.size();
+
+ NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+ NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+ for (int i = 0; i < conditionsSize; i++) {
+ rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+ leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+ // Hash join only supports equality currently.
+ assert conditions.get(i).getRelationship().equals("==");
+ }
+
+ this.htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY,
+ HashTable.DEFAULT_LOAD_FACTOR,
+ rightExpr, leftExpr);
+ }
+
+ @Override
+ public Size getSize() {
+ return left.getSize().add(right.getSize());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitHashJoin(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.size() == 2);
+ return new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.forArray(left, right);
+ }
+
+ public PhysicalOperator getLeft() {
+ return left;
+ }
+
+ public PhysicalOperator getRight() {
+ return right;
+ }
+
+ public JoinRelType getJoinType() {
+ return joinType;
+ }
+
+ public List<JoinCondition> getConditions() {
+ return conditions;
+ }
+
+ public HashTableConfig getHtConfig() {
+ return htConfig;
+ }
+
+ public HashJoinPOP flipIfRight(){
+ if(joinType == JoinRelType.RIGHT){
+ List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size());
+ for(JoinCondition c : conditions){
+ flippedConditions.add(c.flip());
+ }
+ return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
+ }else{
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 507be33..ec579fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -84,6 +84,7 @@ public class ChainedHashTable {
private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD);
private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE);
private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD);
+ private final MappingSet KeyMatchHtableProbeMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE);
private final MappingSet GetHashIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, GET_HASH_BUILD, GET_HASH_BUILD);
private final MappingSet GetHashIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, GET_HASH_PROBE, GET_HASH_PROBE);
private final MappingSet SetValueMapping = new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */, "incomingBuild" /* read container */, "htContainer" /* write container */, SET_VALUE, SET_VALUE);
@@ -114,10 +115,6 @@ public class ChainedHashTable {
ClassGenerator<HashTable> cg = top.getRoot();
ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
- if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) {
- throw new IllegalArgumentException("Mismatched number of output key fields.");
- }
-
LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().length];
LogicalExpression[] keyExprsProbe = null;
boolean isProbe = (htConfig.getKeyExprsProbe() != null) ;
@@ -146,27 +143,34 @@ public class ChainedHashTable {
i++;
}
- if (isProbe) {
+ if (isProbe) {
+ i = 0;
for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
if (expr == null) continue;
keyExprsProbe[i] = expr;
+ i++;
}
}
// generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
-
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds);
- setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableMapping, keyExprsProbe, htKeyFieldIds) ;
+ setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe, htKeyFieldIds) ;
setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds);
- setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);
+ if (outgoing != null) {
+
+ if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) {
+ throw new IllegalArgumentException("Mismatched number of output key fields.");
+ }
+ }
+ setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingBuildMapping, keyExprsBuild);
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, keyExprsProbe);
- HashTable ht = context.getImplementationClass(top);
+ HashTable ht = context.getImplementationClass(top);
ht.setup(htConfig, context, incomingBuild, incomingProbe, outgoing, htContainerOrig);
return ht;
@@ -227,14 +231,18 @@ public class ChainedHashTable {
cg.setMappingSet(OutputRecordKeysMapping);
- for (int i = 0; i < outKeyFieldIds.length; i++) {
- ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
- ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
- HoldingContainer hc = cg.addExpr(vvwExpr);
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
- }
+ if (outKeyFieldIds != null) {
+ for (int i = 0; i < outKeyFieldIds.length; i++) {
+ ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
+ ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
+ HoldingContainer hc = cg.addExpr(vvwExpr);
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
- cg.getEvalBlock()._return(JExpr.TRUE);
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ } else {
+ cg.getEvalBlock()._return(JExpr.FALSE);
+ }
}
private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, LogicalExpression[] keyExprs) throws SchemaChangeException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index d9321b9..2f1172a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -49,7 +49,7 @@ public interface HashTable {
public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder);
- public int containsKey(int incomingRowIdx);
+ public int containsKey(int incomingRowIdx, boolean isProbe);
public int size();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 775766d..f67939e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -371,11 +371,13 @@ public abstract class HashTableTemplate implements HashTable {
}
public void clear() {
- for (BatchHolder bh : batchHolders) {
- bh.clear();
+ if (batchHolders != null) {
+ for (BatchHolder bh : batchHolders) {
+ bh.clear();
+ }
+ batchHolders.clear();
+ batchHolders = null;
}
- batchHolders.clear();
- batchHolders = null;
startIndices.clear();
currentIdxHolder = null;
numEntries = 0;
@@ -486,6 +488,7 @@ public abstract class HashTableTemplate implements HashTable {
}
// Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
+ @Override
public int containsKey(int incomingRowIdx, boolean isProbe) {
int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
int i = getBucketIndex(hash, numBuckets());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
new file mode 100644
index 0000000..7ada651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.*;
+import org.eigenbase.rel.JoinRelType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
+import com.sun.codemodel.JExpr;
+
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+ // Probe side record batch
+ private final RecordBatch left;
+
+ // Build side record batch
+ private final RecordBatch right;
+
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private final JoinRelType joinType;
+
+ // hash table configuration, created in HashJoinPOP
+ private HashTableConfig htConfig;
+
+ // Runtime generated class implementing HashJoinProbe interface
+ private HashJoinProbe hashJoinProbe = null;
+
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
+
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
+
+ /* Hyper container to store all build side record batches.
+ * Records are retrieved from this container when there is a matching record
+ * on the probe side
+ */
+ private ExpandableHyperContainer hyperContainer;
+
+ // Number of records in the output container
+ private int outputRecords;
+
+ // Current batch index on the build side
+ private int buildBatchIndex = 0;
+
+ // List of vector allocators
+ private List<VectorAllocator> allocators = null;
+
+ // Schema of the build side
+ private BatchSchema rightSchema = null;
+
+ // Generator mapping for the build side
+ private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+ "projectBuildRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
+
+ // Generator mapping for the probe side
+ private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+ "projectProbeRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
+
+ // Mapping set for the build side
+ private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+ "buildBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_BUILD, PROJECT_BUILD);
+
+ // Mapping set for the probe side
+ private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+ "probeBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_PROBE, PROJECT_PROBE);
+
+ @Override
+ public int getRecordCount() {
+ return outputRecords;
+ }
+
+
+ @Override
+ public IterOutcome next() {
+
+ try {
+ /* If we are here for the first time, execute the build phase of the
+ * hash join and setup the run time generated class for the probe side
+ */
+ if (hashJoinProbe == null) {
+
+ // Initialize the hash join helper context
+ hjHelper = new HashJoinHelper(context);
+
+ /* Build phase requires setting up the hash table. Hash table will
+ * materialize both the build and probe side expressions while
+ * creating the hash table. So we need to invoke next() on our probe batch
+ * as well, for the materialization to be successful. This batch will not be used
+ * till we complete the build phase.
+ */
+ left.next();
+
+ // Build the hash table, using the build side record batches.
+ executeBuildPhase();
+
+ // Create the run time generated code needed to probe and project
+ hashJoinProbe = setupHashJoinProbe();
+ }
+
+ // Allocate the memory for the vectors in the output container
+ allocateVectors();
+
+ // Store the number of records projected
+ outputRecords = hashJoinProbe.probeAndProject();
+
+ /* We are here because of one the following
+ * 1. Completed processing of all the records and we are done
+ * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+ * Either case build the output container's schema and return
+ */
+ if (outputRecords > 0) {
+
+ // Build the container schema and set the counts
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(outputRecords);
+
+ for (VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputRecords);
+ }
+
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+
+ // No more output records, clean up and return
+ cleanup();
+ return IterOutcome.NONE;
+
+ } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+ context.fail(e);
+ killIncoming();
+ cleanup();
+ return IterOutcome.STOP;
+ }
+ }
+
+ public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+ // Shouldn't be recreating the hash table, this should be done only once
+ assert hashTable == null;
+
+ ChainedHashTable ht = new ChainedHashTable(htConfig, context, this.right, this.left, null);
+ hashTable = ht.createAndSetupHashTable(null);
+
+ }
+
+ public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
+
+ //Setup the underlying hash table
+ IterOutcome rightUpstream = right.next();
+
+ boolean moreData = true;
+
+ setupHashTable();
+
+ while (moreData) {
+
+ switch (rightUpstream) {
+
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ moreData = false;
+ continue;
+
+ case OK_NEW_SCHEMA:
+ if (rightSchema == null) {
+ rightSchema = right.getSchema();
+ } else {
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ }
+ // Fall through
+ case OK:
+ int currentRecordCount = right.getRecordCount();
+
+ /* For every new build batch, we store some state in the helper context
+ * Add new state to the helper context
+ */
+ hjHelper.addNewBatch(currentRecordCount);
+
+ // Holder contains the global index where the key is hashed into using the hash table
+ IntHolder htIndex = new IntHolder();
+
+ // For every record in the build batch , hash the key columns
+ for (int i = 0; i < currentRecordCount; i++) {
+
+ HashTable.PutStatus status = hashTable.put(i, htIndex);
+
+ if (status != HashTable.PutStatus.PUT_FAILED) {
+ /* Use the global index returned by the hash table, to store
+ * the current record index and batch index. This will be used
+ * later when we probe and find a match.
+ */
+ hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+ }
+ }
+
+ /* Completed hashing all records in this batch. Transfer the batch
+ * to the hyper vector container. Will be used when we want to retrieve
+ * records that have matching keys on the probe side.
+ */
+ RecordBatchData nextBatch = new RecordBatchData(right);
+ if (hyperContainer == null) {
+ hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+ } else {
+ hyperContainer.addBatch(nextBatch.getContainer());
+ }
+
+ // completed processing a batch, increment batch index
+ buildBatchIndex++;
+ break;
+ }
+ // Get the next record batch
+ rightUpstream = right.next();
+ }
+ }
+
+ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+ allocators = new ArrayList<>();
+
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+ // Generate the code to project build side records
+ g.setMappingSet(projectBuildMapping);
+
+
+ int fieldId = 0;
+ JExpression buildIndex = JExpr.direct("buildIndex");
+ JExpression outIndex = JExpr.direct("outIndex");
+ g.rotateBlock();
+ for(VectorWrapper<?> vv : hyperContainer) {
+
+ // Add the vector to our output container
+ ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+ JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+ g.getEvalBlock().add(outVV.invoke("copyFrom")
+ .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
+
+ fieldId++;
+ }
+
+ // Generate the code to project probe side records
+ g.setMappingSet(projectProbeMapping);
+
+ int outputFieldId = fieldId;
+ fieldId = 0;
+ JExpression probeIndex = JExpr.direct("probeIndex");
+ for (VectorWrapper<?> vv : left) {
+
+ ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+ JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
+
+ g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV));
+
+ fieldId++;
+ outputFieldId++;
+ }
+
+ HashJoinProbe hj = context.getImplementationClass(cg);
+ hj.setupHashJoinProbe(context, hyperContainer, left, this, hashTable, hjHelper, joinType);
+ return hj;
+ }
+
+ private void allocateVectors(){
+ for(VectorAllocator a : allocators){
+ a.alloc(RecordBatch.MAX_BATCH_SIZE);
+ }
+ }
+
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
+ super(popConfig, context);
+ this.left = left;
+ this.right = right;
+ this.joinType = popConfig.getJoinType();
+ this.htConfig = popConfig.getHtConfig();
+ }
+
+ @Override
+ public void killIncoming() {
+ this.left.kill();
+ this.right.kill();
+ cleanup();
+ }
+
+ @Override
+ public void cleanup() {
+ left.cleanup();
+ right.cleanup();
+ hyperContainer.clear();
+ hjHelper.clear();
+ container.clear();
+ hashTable.clear();
+ super.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
new file mode 100644
index 0000000..19a4a29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 2);
+ return new HashJoinBatch(config, context, children.get(0), children.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
new file mode 100644
index 0000000..e0098b1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+
+
+/*
+ * Helper class for hash join. Keeps track of information about the build side batches.
+ *
+ * Hash join is a blocking operator, so we consume all the batches on the build side and
+ * store them in a hyper container. The way we can retrieve records from the hyper container
+ * is by providing the record index and batch index in the hyper container. When we invoke put()
+ * for a given row, hash table returns a global index. We store the current row's record index
+ * and batch index in this global index of the startIndices structure.
+ *
+ * Since there can be many rows with the same key on the build side, we store the first
+ * index in the startIndices list and the remaining are stored as a logical linked list using
+ * the 'links' field in the BuildInfo structures.
+ *
+ * Apart from the indexes into the hyper container, this class also stores information about
+ * which records of the build side had a matching record on the probe side. Stored in a bitvector
+ * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
+ * for right outer and full outer joins
+ */
+public class HashJoinHelper {
+
+ /* List of start indexes. Stores the record and batch index of the first record
+ * with a give key.
+ */
+ List<SelectionVector4> startIndices = new ArrayList<>();
+
+ // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+ List<BuildInfo> buildInfoList = new ArrayList<>();
+
+ // Fragment context
+ FragmentContext context;
+
+ // Constant to indicate index is empty.
+ static final int INDEX_EMPTY = -1;
+
+ // bits to shift while obtaining batch index from SV4
+ static final int SHIFT_SIZE = 16;
+
+ public HashJoinHelper(FragmentContext context) {
+ this.context = context;
+ }
+
+ public void addStartIndexBatch() throws SchemaChangeException {
+ startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+ }
+
+ public class BuildInfo {
+ // List of links. Logically it helps maintain a linked list of records with the same key value
+ private SelectionVector4 links;
+
+ // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+ private BitSet keyMatchBitVector;
+
+ // number of records in this batch
+ private int recordCount;
+
+ public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+ this.links = links;
+ this.keyMatchBitVector = keyMatchBitVector;
+ this.recordCount = recordCount;
+ }
+
+ public SelectionVector4 getLinks() {
+ return links;
+ }
+
+ public BitSet getKeyMatchBitVector() {
+ return keyMatchBitVector;
+ }
+ }
+
+ public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+
+ ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
+
+ SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+
+ // Initialize the vector
+ for (int i = 0; i < recordCount; i++) {
+ sv4.set(i, INDEX_EMPTY);
+ }
+
+ return sv4;
+ }
+
+ public void addNewBatch(int recordCount) throws SchemaChangeException {
+ // Add a node to the list of BuildInfo's
+ BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+ buildInfoList.add(info);
+ }
+
+ public int getStartIndex(int keyIndex) {
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+ assert batchIdx < startIndices.size();
+
+ SelectionVector4 sv4 = startIndices.get(batchIdx);
+
+ return sv4.get(offsetIdx);
+ }
+
+ public int getNextIndex(int currentIdx) {
+ // Get to the links field of the current index to get the next index
+ int batchIdx = currentIdx >>> SHIFT_SIZE;
+ int recordIdx = currentIdx & HashTable.BATCH_MASK;
+
+ assert batchIdx < buildInfoList.size();
+
+ // Get the corresponding BuildInfo node
+ BuildInfo info = buildInfoList.get(batchIdx);
+ return info.getLinks().get(recordIdx);
+ }
+
+ public List<Integer> getNextUnmatchedIndex() {
+ List<Integer> compositeIndexes = new ArrayList<>();
+
+ for (int i = 0; i < buildInfoList.size(); i++) {
+ BuildInfo info = buildInfoList.get(i);
+ int fromIndex = 0;
+
+ while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+ compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+ fromIndex++;
+ }
+ }
+ return compositeIndexes;
+ }
+
+ public void setRecordMatched(int index) {
+ int batchIdx = index >>> SHIFT_SIZE;
+ int recordIdx = index & HashTable.BATCH_MASK;
+
+ // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+ BuildInfo info = buildInfoList.get(batchIdx);
+ BitSet bitVector = info.getKeyMatchBitVector();
+
+ bitVector.set(recordIdx);
+ }
+
+ public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+
+ /* set the current record batch index and the index
+ * within the batch at the specified keyIndex. The keyIndex
+ * denotes the global index where the key for this record is
+ * stored in the hash table
+ */
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+ if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+ // allocate a new batch
+ addStartIndexBatch();
+ }
+
+ SelectionVector4 startIndex = startIndices.get(batchIdx);
+ int linkIndex;
+
+ // If its the first value for this key
+ if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+ startIndex.set(offsetIdx, batchIndex, recordIndex);
+ } else {
+ /* we already have encountered a record with the same key
+ * use links to store this value
+ */
+ SelectionVector4 link;
+ do {
+ //Traverse the links to get an empty slot to insert the current index
+ batchIdx = linkIndex >>> SHIFT_SIZE;
+ offsetIdx = linkIndex & Character.MAX_VALUE;
+
+ // get the next link
+ link = buildInfoList.get(batchIdx).getLinks();
+ } while ((linkIndex = link.get(offsetIdx)) != INDEX_EMPTY);
+
+ // We have the correct batchIdx and offset within the batch to store the next link
+ link.set(offsetIdx, batchIndex, recordIndex);
+ }
+ }
+
+ public void clear() {
+ // Clear the SV4 used for start indices
+ for (SelectionVector4 sv4: startIndices) {
+ sv4.clear();
+ }
+
+ for (BuildInfo info : buildInfoList) {
+ info.getLinks().clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
new file mode 100644
index 0000000..c99f2a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.eigenbase.rel.JoinRelType;
+
+import java.io.IOException;
+
+public interface HashJoinProbe {
+ public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+
+ /* The probe side of the hash join can be in the following two states
+ * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+ * key match and if we do we project the record
+ * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+ * from the build side that did not match any records on the probe side. For Left outer
+ * case we handle it internally by projecting the record if there isn't a match on the build side
+ * 3. DONE: Once we have projected all possible records we are done
+ */
+ public static enum ProbeState {
+ PROBE_PROJECT, PROJECT_RIGHT, DONE
+ }
+
+ public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType);
+ public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+ public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+ public abstract void projectBuildRecord(int buildIndex, int outIndex);
+ public abstract void projectProbeRecord(int probeIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
new file mode 100644
index 0000000..cc1a257
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.sql2rel.StandardConvertletTable;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+
+ // Probe side record batch
+ private RecordBatch probeBatch;
+
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private JoinRelType joinType;
+
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
+
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
+
+ // Number of records to process on the probe side
+ private int recordsToProcess = 0;
+
+ // Number of records processed on the probe side
+ private int recordsProcessed = 0;
+
+ // Number of records in the output container
+ private int outputRecords;
+
+ // Indicate if we should drain the next record from the probe side
+ private boolean getNextRecord = true;
+
+ // Contains both batch idx and record idx of the matching record in the build side
+ private int currentCompositeIdx = -1;
+
+ // Current state the hash join algorithm is in
+ private ProbeState probeState = ProbeState.PROBE_PROJECT;
+
+ // For outer or right joins, this is a list of unmatched records that needs to be projected
+ private List<Integer> unmatchedBuildIndexes = null;
+
+ @Override
+ public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+ JoinRelType joinRelType) {
+
+ this.probeBatch = probeBatch;
+ this.joinType = joinRelType;
+ this.recordsToProcess = probeBatch.getRecordCount();
+ this.hashTable = hashTable;
+ this.hjHelper = hjHelper;
+
+ doSetup(context, buildBatch, probeBatch, outgoing);
+ }
+
+ public void executeProjectRightPhase() {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+ projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+ }
+ }
+
+ public void executeProbePhase() throws SchemaChangeException {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+ // Check if we have processed all records in this batch we need to invoke next
+ if (recordsProcessed == recordsToProcess) {
+ IterOutcome leftUpstream = probeBatch.next();
+
+ switch (leftUpstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+ probeState = ProbeState.DONE;
+
+ // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+ if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+ probeState = ProbeState.PROJECT_RIGHT;
+ }
+
+ continue;
+
+ case OK_NEW_SCHEMA:
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ case OK:
+ recordsToProcess = probeBatch.getRecordCount();
+ recordsProcessed = 0;
+ }
+ }
+ int probeIndex;
+
+ // Check if we need to drain the next row in the probe side
+ if (getNextRecord) {
+ probeIndex = hashTable.containsKey(recordsProcessed, true);
+
+ if (probeIndex != -1) {
+
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ */
+ currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+ /* Record in the build side at currentCompositeIdx has a matching record in the probe
+ * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+ * join we keep track of which records we need to project at the end
+ */
+ hjHelper.setRecordMatched(currentCompositeIdx);
+
+ projectBuildRecord(currentCompositeIdx, outputRecords);
+ projectProbeRecord(recordsProcessed, outputRecords);
+ outputRecords++;
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
+ }
+ else {
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
+ }
+ }
+ else { // No matching key
+
+ // If we have a left outer join, project the keys
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+ projectProbeRecord(recordsProcessed, outputRecords++);
+ }
+ recordsProcessed++;
+ }
+ }
+ else {
+ hjHelper.setRecordMatched(currentCompositeIdx);
+ projectBuildRecord(currentCompositeIdx, outputRecords);
+ projectProbeRecord(recordsProcessed, outputRecords);
+ outputRecords++;
+
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ // We don't have any more rows matching the current key on the build side, move on to the next probe row
+ getNextRecord = true;
+ recordsProcessed++;
+ }
+ }
+ }
+ }
+
+ public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+
+ outputRecords = 0;
+
+ if (probeState == ProbeState.PROBE_PROJECT) {
+ executeProbePhase();
+ }
+
+ if (probeState == ProbeState.PROJECT_RIGHT) {
+
+ // We are here because we have a RIGHT OUTER or a FULL join
+ if (unmatchedBuildIndexes == null) {
+ // Initialize list of build indexes that didn't match a record on the probe side
+ unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+ recordsToProcess = unmatchedBuildIndexes.size();
+ recordsProcessed = 0;
+ }
+
+ // Project the list of unmatched records on the build side
+ executeProjectRightPhase();
+ }
+
+ return outputRecords;
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+ @Named("outgoing") RecordBatch outgoing);
+ public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+ public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
new file mode 100644
index 0000000..529563a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.codahale.metrics.MetricRegistry;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+
+public class TestHashJoin extends PopUnitTestBase{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
+
+ DrillConfig c = DrillConfig.create();
+
+ private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable {
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry();
+ bitContext.getAllocator(); result = new TopLevelAllocator();
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(physicalPlan), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int totalRecordCount = 0;
+ while (exec.next()) {
+ totalRecordCount += exec.getRecordCount();
+ }
+ assertEquals(expectedRows, totalRecordCount);
+ System.out.println("Total Record Count: " + totalRecordCount);
+ if (context.getFailureCause() != null)
+ throw context.getFailureCause();
+ assertTrue(!context.isFailed());
+ }
+
+ @Test
+ public void multiBatchEqualityJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ testHJMockScanCommon(bitContext, connection, "/join/hash_join_multi_batch.json", 200000);
+ }
+
+ @Test
+ public void multiBatchRightOuterJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ testHJMockScanCommon(bitContext, connection, "/join/hj_right_outer_multi_batch.json", 100000);
+ }
+
+ @Test
+ public void multiBatchLeftOuterJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+
+ testHJMockScanCommon(bitContext, connection, "/join/hj_left_outer_multi_batch.json", 100000);
+ }
+
+ @Test
+ public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ // Function checks for casting from Float, Double to Decimal data types
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/join/hash_join.json"), Charsets.UTF_8)
+ .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString())
+ .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString()));
+
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+
+ QueryResultBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+ batchLoader.getValueAccessorById(0, IntVector.class);
+
+ Iterator<VectorWrapper<?>> itr = batchLoader.iterator();
+
+ // Just test the join key
+ long colA[] = {1, 1, 2, 2, 1, 1};
+
+ // Check the output of decimal9
+ ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor();
+
+
+ for (int i = 0; i < intAccessor1.getValueCount(); i++) {
+ assertEquals(intAccessor1.getObject(i), colA[i]);
+ }
+ assertEquals(6, intAccessor1.getValueCount());
+ }
+ }
+
+ @Test
+ public void multipleConditionJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ // Function checks for casting from Float, Double to Decimal data types
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/join/hj_multi_condition_join.json"), Charsets.UTF_8)
+ .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString())
+ .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString()));
+
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+
+ QueryResultBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+ batchLoader.getValueAccessorById(0, IntVector.class);
+
+ Iterator<VectorWrapper<?>> itr = batchLoader.iterator();
+
+ // Just test the join key
+ long colA[] = {1, 2, 1};
+ long colC[] = {100, 200, 500};
+
+ // Check the output of decimal9
+ ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor();
+ ValueVector.Accessor intAccessor2 = itr.next().getValueVector().getAccessor();
+
+
+ for (int i = 0; i < intAccessor1.getValueCount(); i++) {
+ assertEquals(intAccessor1.getObject(i), colA[i]);
+ assertEquals(intAccessor2.getObject(i), colC[i]);
+ }
+ assertEquals(3, intAccessor1.getValueCount());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index 78f7e43..e5cd508 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -45,7 +45,7 @@ public abstract class PopUnitTestBase {
protected static DrillConfig CONFIG;
// Set a timeout unless we're debugging.
- @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(25000);
+ @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(500000);
@BeforeClass
public static void setup() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/build_side_input.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/build_side_input.json b/exec/java-exec/src/test/resources/build_side_input.json
new file mode 100644
index 0000000..31006a6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/build_side_input.json
@@ -0,0 +1,24 @@
+{
+"A": 1,
+"C": 100
+}
+{
+"A": 1,
+"C": 500
+}
+{
+"A": 2,
+"C": 200
+}
+{
+"A": 3,
+"C": 300
+}
+{
+"A": 4,
+"C": 400
+}
+{
+"A": 5,
+"C": 500
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hash_join.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hash_join.json b/exec/java-exec/src/test/resources/join/hash_join.json
new file mode 100644
index 0000000..60b915b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hash_join.json
@@ -0,0 +1,63 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "resultMode" : "EXEC"
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{TEST_FILE_1}"]
+ },
+ {
+ @id:2,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{TEST_FILE_2}"]
+ },
+ {
+ "pop" : "project",
+ "@id" : 3,
+ "exprs" : [ {
+ "ref" : "output.A",
+ "expr" : "A"
+ },
+ { "ref" : "output.CCOL", "expr" : "C" }
+ ],
+
+ "child" : 1
+ },
+ {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [ {
+ "ref" : "output.B",
+ "expr" : "B"
+ },
+ { "ref" : "output.DCOL", "expr" : "D" }
+ ],
+
+ "child" : 2
+ },
+ {
+ @id: 5,
+ right: 3,
+ left: 4,
+ pop: "hash-join",
+ join-conditions: [ {relationship: "==", left: "B", right: "A"} ]
+ },
+ {
+ @id: 6,
+ child: 5,
+ pop: "screen"
+ }
+ ]
+ }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
new file mode 100644
index 0000000..7e218a4
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
@@ -0,0 +1,47 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://source1.apache.org",
+ entries:[
+ {records: 4, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ pop:"mock-sub-scan",
+ url: "http://source2.apache.org",
+ entries:[
+ {records: 100000, types: [
+ {name: "blue1", type: "INT", mode: "REQUIRED"},
+ {name: "red1", type: "INT", mode: "REQUIRED"},
+ {name: "green1", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 3,
+ right: 1,
+ left: 2,
+ pop: "hash-join",
+ join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
new file mode 100644
index 0000000..0cb5a03
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
@@ -0,0 +1,48 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://source1.apache.org",
+ entries:[
+ {records: 1, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ pop:"mock-sub-scan",
+ url: "http://source2.apache.org",
+ entries:[
+ {records: 100000, types: [
+ {name: "blue1", type: "INT", mode: "REQUIRED"},
+ {name: "red1", type: "INT", mode: "REQUIRED"},
+ {name: "green1", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 3,
+ right: 1,
+ left: 2,
+ pop: "hash-join",
+ join-type: "LEFT",
+ join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
new file mode 100644
index 0000000..fd680d0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
@@ -0,0 +1,66 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "resultMode" : "EXEC"
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{TEST_FILE_1}"]
+ },
+ {
+ @id:2,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{TEST_FILE_2}"]
+ },
+ {
+ "pop" : "project",
+ "@id" : 3,
+ "exprs" : [ {
+ "ref" : "output.A",
+ "expr" : "A"
+ },
+ { "ref" : "output.CCOL", "expr" : "C" }
+ ],
+
+ "child" : 1
+ },
+ {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [ {
+ "ref" : "output.B",
+ "expr" : "B"
+ },
+ { "ref" : "output.DCOL", "expr" : "D" }
+ ],
+
+ "child" : 2
+ },
+ {
+ @id: 5,
+ right: 3,
+ left: 4,
+ pop: "hash-join",
+ join-conditions: [
+ {relationship: "==", left: "B", right: "A"},
+ {relationship: "==", left: "DCOL", right: "CCOL"}
+ ]
+ },
+ {
+ @id: 6,
+ child: 5,
+ pop: "screen"
+ }
+ ]
+ }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
new file mode 100644
index 0000000..b8723aa
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
@@ -0,0 +1,48 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://source1.apache.org",
+ entries:[
+ {records: 100000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ pop:"mock-sub-scan",
+ url: "http://source2.apache.org",
+ entries:[
+ {records: 1, types: [
+ {name: "blue1", type: "INT", mode: "REQUIRED"},
+ {name: "red1", type: "INT", mode: "REQUIRED"},
+ {name: "green1", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 3,
+ right: 1,
+ left: 2,
+ pop: "hash-join",
+ join-type: "RIGHT",
+ join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/probe_side_input.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/probe_side_input.json b/exec/java-exec/src/test/resources/probe_side_input.json
new file mode 100644
index 0000000..d3dbbb3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/probe_side_input.json
@@ -0,0 +1,28 @@
+{
+"B": 1,
+"D": 100
+}
+{
+"B": 2,
+"D": 200
+}
+{
+"B": 2,
+"D": 300
+}
+{
+"B": 9,
+"D": 900
+}
+{
+"B": 1,
+"D": 500
+}
+{
+"B": 10,
+"D": 1000
+}
+{
+"B": 11,
+"D": 1100
+}
\ No newline at end of file