You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 06:08:25 UTC

[17/17] git commit: DRILL-505: Hash Join

DRILL-505: Hash Join

Support for left outer, right outer and full joins

Support for multiple join conditions

Add following tests
 - Multiple condition join
 - Join on JSON scan
 - Multi batch join
 - Simple equality join


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1fc7b982
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1fc7b982
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1fc7b982

Branch: refs/heads/master
Commit: 1fc7b982414bc0dcd29b1d31e312d2207971933a
Parents: c7cb7ba
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Feb 25 01:09:06 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 19 21:07:29 2014 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   5 +
 .../exec/physical/base/PhysicalVisitor.java     |   1 +
 .../drill/exec/physical/config/HashJoinPOP.java | 143 ++++++++
 .../physical/impl/common/ChainedHashTable.java  |  40 ++-
 .../exec/physical/impl/common/HashTable.java    |   2 +-
 .../physical/impl/common/HashTableTemplate.java |  11 +-
 .../exec/physical/impl/join/HashJoinBatch.java  | 355 +++++++++++++++++++
 .../impl/join/HashJoinBatchCreator.java         |  37 ++
 .../exec/physical/impl/join/HashJoinHelper.java | 222 ++++++++++++
 .../exec/physical/impl/join/HashJoinProbe.java  |  56 +++
 .../impl/join/HashJoinProbeTemplate.java        | 226 ++++++++++++
 .../exec/physical/impl/join/TestHashJoin.java   | 216 +++++++++++
 .../apache/drill/exec/pop/PopUnitTestBase.java  |   2 +-
 .../src/test/resources/build_side_input.json    |  24 ++
 .../src/test/resources/join/hash_join.json      |  63 ++++
 .../resources/join/hash_join_multi_batch.json   |  47 +++
 .../join/hj_left_outer_multi_batch.json         |  48 +++
 .../resources/join/hj_multi_condition_join.json |  66 ++++
 .../join/hj_right_outer_multi_batch.json        |  48 +++
 .../src/test/resources/probe_side_input.json    |  28 ++
 20 files changed, 1618 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 5f37487..abafc42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -106,6 +106,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
+  public T visitHashJoin(HashJoinPOP join, X value) throws E {
+    return visitOp(join, value);
+  }
+
+  @Override
   public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
     return visitSender(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 712fafe..208dab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -41,6 +41,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
+  public RETURN visitHashJoin(HashJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
   public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
   public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
new file mode 100644
index 0000000..f4a1fc7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+
+import org.eigenbase.rel.JoinRelType;
+
+@JsonTypeName("hash-join")
+public class HashJoinPOP extends AbstractBase {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
+
+
+    private final PhysicalOperator left;
+    private final PhysicalOperator right;
+    private final List<JoinCondition> conditions;
+    private final JoinRelType joinType;
+    private final HashTableConfig htConfig;
+
+    @Override
+    public OperatorCost getCost() {
+        return new OperatorCost(0,0,0,0);
+    }
+
+    @JsonCreator
+    public HashJoinPOP(
+            @JsonProperty("left") PhysicalOperator left,
+            @JsonProperty("right") PhysicalOperator right,
+            @JsonProperty("join-conditions") List<JoinCondition> conditions,
+            @JsonProperty("join-type") JoinRelType joinType
+    ) {
+        this.left = left;
+        this.right = right;
+        this.conditions = conditions;
+        this.joinType = joinType;
+
+        int conditionsSize = conditions.size();
+
+        NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+        NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+        for (int i = 0; i < conditionsSize; i++) {
+            rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+            leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+            // Hash join only supports equality currently.
+            assert conditions.get(i).getRelationship().equals("==");
+        }
+
+        this.htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY,
+                                            HashTable.DEFAULT_LOAD_FACTOR,
+                                            rightExpr, leftExpr);
+    }
+
+    @Override
+    public Size getSize() {
+        return left.getSize().add(right.getSize());
+    }
+
+    @Override
+    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+        return physicalVisitor.visitHashJoin(this, value);
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+        Preconditions.checkArgument(children.size() == 2);
+        return new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+    }
+
+    @Override
+    public Iterator<PhysicalOperator> iterator() {
+        return Iterators.forArray(left, right);
+    }
+
+    public PhysicalOperator getLeft() {
+        return left;
+    }
+
+    public PhysicalOperator getRight() {
+        return right;
+    }
+
+    public JoinRelType getJoinType() {
+        return joinType;
+    }
+
+    public List<JoinCondition> getConditions() {
+        return conditions;
+    }
+
+    public HashTableConfig getHtConfig() {
+        return htConfig;
+    }
+
+    public HashJoinPOP flipIfRight(){
+        if(joinType == JoinRelType.RIGHT){
+            List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size());
+            for(JoinCondition c : conditions){
+                flippedConditions.add(c.flip());
+            }
+            return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
+        }else{
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 507be33..ec579fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -84,6 +84,7 @@ public class ChainedHashTable {
   private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD);
   private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE);
   private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD);
+  private final MappingSet KeyMatchHtableProbeMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE);
   private final MappingSet GetHashIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, GET_HASH_BUILD, GET_HASH_BUILD);
   private final MappingSet GetHashIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, GET_HASH_PROBE, GET_HASH_PROBE);
   private final MappingSet SetValueMapping = new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */, "incomingBuild" /* read container */, "htContainer" /* write container */, SET_VALUE, SET_VALUE);
@@ -114,10 +115,6 @@ public class ChainedHashTable {
     ClassGenerator<HashTable> cg = top.getRoot();
     ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
 
-    if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) {
-      throw new IllegalArgumentException("Mismatched number of output key fields.");
-    }
-
     LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().length];
     LogicalExpression[] keyExprsProbe = null;
     boolean isProbe = (htConfig.getKeyExprsProbe() != null) ;
@@ -146,27 +143,34 @@ public class ChainedHashTable {
       i++;
     }
 
-    if (isProbe) { 
+    if (isProbe) {
+      i = 0;
       for (NamedExpression ne : htConfig.getKeyExprsProbe()) { 
         final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
         if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
         if (expr == null) continue;
         keyExprsProbe[i] = expr;
+        i++;
       }
     }
 
     // generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
-
     setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds);
-    setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableMapping, keyExprsProbe, htKeyFieldIds) ;
+    setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe, htKeyFieldIds) ;
 
     setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds);
-    setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);    
+    if (outgoing != null) {
+
+      if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().length) {
+        throw new IllegalArgumentException("Mismatched number of output key fields.");
+      }
+    }
+    setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);
 
     setupGetHash(cg /* use top level code generator for getHash */,  GetHashIncomingBuildMapping, keyExprsBuild);
     setupGetHash(cg /* use top level code generator for getHash */,  GetHashIncomingProbeMapping, keyExprsProbe);
 
-    HashTable ht = context.getImplementationClass(top); 
+    HashTable ht = context.getImplementationClass(top);
     ht.setup(htConfig, context, incomingBuild, incomingProbe, outgoing, htContainerOrig);
 
     return ht;
@@ -227,14 +231,18 @@ public class ChainedHashTable {
 
     cg.setMappingSet(OutputRecordKeysMapping);
 
-    for (int i = 0; i < outKeyFieldIds.length; i++) {
-      ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
-      ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
-      HoldingContainer hc = cg.addExpr(vvwExpr);
-      cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
-    }
+    if (outKeyFieldIds != null) {
+      for (int i = 0; i < outKeyFieldIds.length; i++) {
+        ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
+        ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
+        HoldingContainer hc = cg.addExpr(vvwExpr);
+        cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+      }
 
-    cg.getEvalBlock()._return(JExpr.TRUE);
+      cg.getEvalBlock()._return(JExpr.TRUE);
+    } else {
+      cg.getEvalBlock()._return(JExpr.FALSE);
+    }
   }
 
   private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, LogicalExpression[] keyExprs) throws SchemaChangeException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index d9321b9..2f1172a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -49,7 +49,7 @@ public interface HashTable {
 
   public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder);
   
-  public int containsKey(int incomingRowIdx);
+  public int containsKey(int incomingRowIdx, boolean isProbe);
 
   public int size();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 775766d..f67939e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -371,11 +371,13 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   public void clear() {
-    for (BatchHolder bh : batchHolders) {
-      bh.clear();
+    if (batchHolders != null) {
+      for (BatchHolder bh : batchHolders) {
+        bh.clear();
+      }
+      batchHolders.clear();
+      batchHolders = null;
     }
-    batchHolders.clear();
-    batchHolders = null;
     startIndices.clear();
     currentIdxHolder = null;
     numEntries = 0;
@@ -486,6 +488,7 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
+  @Override
   public int containsKey(int incomingRowIdx, boolean isProbe) {
     int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
     int i = getBucketIndex(hash, numBuckets());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
new file mode 100644
index 0000000..7ada651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.*;
+import org.eigenbase.rel.JoinRelType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
+import com.sun.codemodel.JExpr;
+
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+    // Probe side record batch
+    private final RecordBatch left;
+
+    // Build side record batch
+    private final RecordBatch right;
+
+    // Join type, INNER, LEFT, RIGHT or OUTER
+    private final JoinRelType joinType;
+
+    // hash table configuration, created in HashJoinPOP
+    private HashTableConfig htConfig;
+
+    // Runtime generated class implementing HashJoinProbe interface
+    private HashJoinProbe hashJoinProbe = null;
+
+    /* Helper class
+     * Maintains linked list of build side records with the same key
+     * Keeps information about which build records have a corresponding
+     * matching key in the probe side (for outer, right joins)
+     */
+    private HashJoinHelper hjHelper = null;
+
+    // Underlying hashtable used by the hash join
+    private HashTable hashTable = null;
+
+    /* Hyper container to store all build side record batches.
+     * Records are retrieved from this container when there is a matching record
+     * on the probe side
+     */
+    private ExpandableHyperContainer hyperContainer;
+
+    // Number of records in the output container
+    private int outputRecords;
+
+    // Current batch index on the build side
+    private int buildBatchIndex = 0;
+
+    // List of vector allocators
+    private List<VectorAllocator> allocators = null;
+
+    // Schema of the build side
+    private BatchSchema rightSchema = null;
+
+    // Generator mapping for the build side
+    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+                                                                                  "projectBuildRecord" /* eval method */,
+                                                                                  null /* reset */, null /* cleanup */);
+
+    // Generator mapping for the probe side
+    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+                                                                                  "projectProbeRecord" /* eval method */,
+                                                                                  null /* reset */, null /* cleanup */);
+
+    // Mapping set for the build side
+    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+                                                                  "buildBatch" /* read container */,
+                                                                  "outgoing" /* write container */,
+                                                                  PROJECT_BUILD, PROJECT_BUILD);
+
+    // Mapping set for the probe side
+    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+                                                                  "probeBatch" /* read container */,
+                                                                  "outgoing" /* write container */,
+                                                                  PROJECT_PROBE, PROJECT_PROBE);
+
+    @Override
+    public int getRecordCount() {
+        return outputRecords;
+    }
+
+
+    @Override
+    public IterOutcome next() {
+
+        try {
+            /* If we are here for the first time, execute the build phase of the
+             * hash join and setup the run time generated class for the probe side
+             */
+            if (hashJoinProbe == null) {
+
+                // Initialize the hash join helper context
+                hjHelper = new HashJoinHelper(context);
+
+                /* Build phase requires setting up the hash table. Hash table will
+                 * materialize both the build and probe side expressions while
+                 * creating the hash table. So we need to invoke next() on our probe batch
+                 * as well, for the materialization to be successful. This batch will not be used
+                 * till we complete the build phase.
+                 */
+                left.next();
+
+                // Build the hash table, using the build side record batches.
+                executeBuildPhase();
+
+                // Create the run time generated code needed to probe and project
+                hashJoinProbe = setupHashJoinProbe();
+            }
+
+            // Allocate the memory for the vectors in the output container
+            allocateVectors();
+
+            // Store the number of records projected
+            outputRecords = hashJoinProbe.probeAndProject();
+
+            /* We are here because of one the following
+             * 1. Completed processing of all the records and we are done
+             * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+             * Either case build the output container's schema and return
+             */
+            if (outputRecords > 0) {
+
+                // Build the container schema and set the counts
+                container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+                container.setRecordCount(outputRecords);
+
+                for (VectorWrapper<?> v : container) {
+                    v.getValueVector().getMutator().setValueCount(outputRecords);
+                }
+
+                return IterOutcome.OK_NEW_SCHEMA;
+            }
+
+            // No more output records, clean up and return
+            cleanup();
+            return IterOutcome.NONE;
+
+        } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+            context.fail(e);
+            killIncoming();
+            cleanup();
+            return IterOutcome.STOP;
+        }
+    }
+
+    public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+        // Shouldn't be recreating the hash table, this should be done only once
+        assert hashTable == null;
+
+        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, this.right, this.left, null);
+        hashTable = ht.createAndSetupHashTable(null);
+
+    }
+
+    public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
+
+        //Setup the underlying hash table
+        IterOutcome rightUpstream = right.next();
+
+        boolean moreData = true;
+
+        setupHashTable();
+
+        while (moreData) {
+
+            switch (rightUpstream) {
+
+                case NONE:
+                case NOT_YET:
+                case STOP:
+                    moreData = false;
+                    continue;
+
+                case OK_NEW_SCHEMA:
+                    if (rightSchema == null) {
+                        rightSchema = right.getSchema();
+                    } else {
+                        throw new SchemaChangeException("Hash join does not support schema changes");
+                    }
+                // Fall through
+                case OK:
+                    int currentRecordCount = right.getRecordCount();
+
+                    /* For every new build batch, we store some state in the helper context
+                     * Add new state to the helper context
+                     */
+                    hjHelper.addNewBatch(currentRecordCount);
+
+                    // Holder contains the global index where the key is hashed into using the hash table
+                    IntHolder htIndex = new IntHolder();
+
+                    // For every record in the build batch , hash the key columns
+                    for (int i = 0; i < currentRecordCount; i++) {
+
+                        HashTable.PutStatus status = hashTable.put(i, htIndex);
+
+                        if (status != HashTable.PutStatus.PUT_FAILED) {
+                            /* Use the global index returned by the hash table, to store
+                             * the current record index and batch index. This will be used
+                             * later when we probe and find a match.
+                             */
+                            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+                        }
+                    }
+
+                    /* Completed hashing all records in this batch. Transfer the batch
+                     * to the hyper vector container. Will be used when we want to retrieve
+                     * records that have matching keys on the probe side.
+                     */
+                    RecordBatchData nextBatch = new RecordBatchData(right);
+                    if (hyperContainer == null) {
+                        hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+                    } else {
+                        hyperContainer.addBatch(nextBatch.getContainer());
+                    }
+
+                    // completed processing a batch, increment batch index
+                    buildBatchIndex++;
+                    break;
+            }
+            // Get the next record batch
+            rightUpstream = right.next();
+        }
+    }
+
+    public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+        allocators = new ArrayList<>();
+
+        final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+        ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+        // Generate the code to project build side records
+        g.setMappingSet(projectBuildMapping);
+
+
+        int fieldId = 0;
+        JExpression buildIndex = JExpr.direct("buildIndex");
+        JExpression outIndex = JExpr.direct("outIndex");
+        g.rotateBlock();
+        for(VectorWrapper<?> vv : hyperContainer) {
+
+            // Add the vector to our output container
+            ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator());
+            container.add(v);
+            allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+            JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
+            JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+            g.getEvalBlock().add(outVV.invoke("copyFrom")
+                    .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+                    .arg(outIndex)
+                    .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
+
+            fieldId++;
+        }
+
+        // Generate the code to project probe side records
+        g.setMappingSet(projectProbeMapping);
+
+        int outputFieldId = fieldId;
+        fieldId = 0;
+        JExpression probeIndex = JExpr.direct("probeIndex");
+        for (VectorWrapper<?> vv : left) {
+
+            ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator());
+            container.add(v);
+            allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+            JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
+            JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
+
+            g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV));
+
+            fieldId++;
+            outputFieldId++;
+        }
+
+        HashJoinProbe hj = context.getImplementationClass(cg);
+        hj.setupHashJoinProbe(context, hyperContainer, left, this, hashTable, hjHelper, joinType);
+        return hj;
+    }
+
+    private void allocateVectors(){
+        for(VectorAllocator a : allocators){
+            a.alloc(RecordBatch.MAX_BATCH_SIZE);
+        }
+    }
+
+    public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
+        super(popConfig, context);
+        this.left = left;
+        this.right = right;
+        this.joinType = popConfig.getJoinType();
+        this.htConfig = popConfig.getHtConfig();
+    }
+
+    @Override
+    public void killIncoming() {
+        this.left.kill();
+        this.right.kill();
+        cleanup();
+    }
+
+    @Override
+    public void cleanup() {
+        left.cleanup();
+        right.cleanup();
+        hyperContainer.clear();
+        hjHelper.clear();
+        container.clear();
+        hashTable.clear();
+        super.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
new file mode 100644
index 0000000..19a4a29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
+
+    @Override
+    public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+        Preconditions.checkArgument(children.size() == 2);
+        return new HashJoinBatch(config, context, children.get(0), children.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
new file mode 100644
index 0000000..e0098b1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+
+
+/*
+ * Helper class for hash join. Keeps track of information about the build side batches.
+ *
+ * Hash join is a blocking operator, so we consume all the batches on the build side and
+ * store them in a hyper container. The way we can retrieve records from the hyper container
+ * is by providing the record index and batch index in the hyper container. When we invoke put()
+ * for a given row, hash table returns a global index. We store the current row's record index
+ * and batch index in this global index of the startIndices structure.
+ *
+ * Since there can be many rows with the same key on the build side, we store the first
+ * index in the startIndices list and the remaining are stored as a logical linked list using
+ * the 'links' field in the BuildInfo structures.
+ *
+ * Apart from the indexes into the hyper container, this class also stores information about
+ * which records of the build side had a matching record on the probe side. Stored in a bitvector
+ * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
+ * for right outer and full outer joins
+ */
+public class HashJoinHelper {
+
+    /* List of start indexes. Stores the record and batch index of the first record
+     * with a give key.
+     */
+    List<SelectionVector4> startIndices = new ArrayList<>();
+
+    // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+    List<BuildInfo> buildInfoList = new ArrayList<>();
+
+    // Fragment context
+    FragmentContext context;
+
+    // Constant to indicate index is empty.
+    static final int INDEX_EMPTY = -1;
+
+    // bits to shift while obtaining batch index from SV4
+    static final int SHIFT_SIZE = 16;
+
+    public HashJoinHelper(FragmentContext context) {
+        this.context = context;
+    }
+
+    public void addStartIndexBatch() throws SchemaChangeException {
+        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+    }
+
+    public class BuildInfo {
+        // List of links. Logically it helps maintain a linked list of records with the same key value
+        private SelectionVector4 links;
+
+        // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+        private BitSet keyMatchBitVector;
+
+        // number of records in this batch
+        private int recordCount;
+
+        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+            this.links = links;
+            this.keyMatchBitVector = keyMatchBitVector;
+            this.recordCount = recordCount;
+        }
+
+        public SelectionVector4 getLinks() {
+            return links;
+        }
+
+        public BitSet getKeyMatchBitVector() {
+            return keyMatchBitVector;
+        }
+    }
+
+    public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+
+        ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
+
+        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+
+        // Initialize the vector
+        for (int i = 0; i < recordCount; i++) {
+            sv4.set(i, INDEX_EMPTY);
+        }
+
+        return sv4;
+    }
+
+    public void addNewBatch(int recordCount) throws SchemaChangeException {
+        // Add a node to the list of BuildInfo's
+        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+        buildInfoList.add(info);
+    }
+
+    public int getStartIndex(int keyIndex) {
+        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+        assert batchIdx < startIndices.size();
+
+        SelectionVector4 sv4 = startIndices.get(batchIdx);
+
+        return sv4.get(offsetIdx);
+    }
+
+    public int getNextIndex(int currentIdx) {
+        // Get to the links field of the current index to get the next index
+        int batchIdx = currentIdx >>> SHIFT_SIZE;
+        int recordIdx = currentIdx & HashTable.BATCH_MASK;
+
+        assert batchIdx < buildInfoList.size();
+
+        // Get the corresponding BuildInfo node
+        BuildInfo info = buildInfoList.get(batchIdx);
+        return info.getLinks().get(recordIdx);
+    }
+
+    public List<Integer> getNextUnmatchedIndex() {
+        List<Integer> compositeIndexes = new ArrayList<>();
+
+        for (int i = 0; i < buildInfoList.size(); i++) {
+            BuildInfo info = buildInfoList.get(i);
+            int fromIndex = 0;
+
+            while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+                compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+                fromIndex++;
+            }
+        }
+        return compositeIndexes;
+    }
+
+    public void setRecordMatched(int index) {
+        int batchIdx  = index >>> SHIFT_SIZE;
+        int recordIdx = index & HashTable.BATCH_MASK;
+
+        // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+        BuildInfo info = buildInfoList.get(batchIdx);
+        BitSet bitVector = info.getKeyMatchBitVector();
+
+        bitVector.set(recordIdx);
+    }
+
+    public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+
+        /* set the current record batch index and the index
+         * within the batch at the specified keyIndex. The keyIndex
+         * denotes the global index where the key for this record is
+         * stored in the hash table
+         */
+        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+        if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+            // allocate a new batch
+            addStartIndexBatch();
+        }
+
+        SelectionVector4 startIndex = startIndices.get(batchIdx);
+        int linkIndex;
+
+        // If its the first value for this key
+        if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+            startIndex.set(offsetIdx, batchIndex, recordIndex);
+        } else {
+            /* we already have encountered a record with the same key
+             * use links to store this value
+             */
+            SelectionVector4 link;
+            do {
+                //Traverse the links to get an empty slot to insert the current index
+                batchIdx  = linkIndex >>> SHIFT_SIZE;
+                offsetIdx = linkIndex & Character.MAX_VALUE;
+
+                // get the next link
+                link = buildInfoList.get(batchIdx).getLinks();
+            } while ((linkIndex = link.get(offsetIdx)) != INDEX_EMPTY);
+
+            // We have the correct batchIdx and offset within the batch to store the next link
+            link.set(offsetIdx, batchIndex, recordIndex);
+        }
+    }
+
+    public void clear() {
+        // Clear the SV4 used for start indices
+        for (SelectionVector4 sv4: startIndices) {
+            sv4.clear();
+        }
+
+        for (BuildInfo info : buildInfoList) {
+            info.getLinks().clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
new file mode 100644
index 0000000..c99f2a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.eigenbase.rel.JoinRelType;
+
+import java.io.IOException;
+
+public interface HashJoinProbe {
+    public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+
+    /* The probe side of the hash join can be in the following two states
+     * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+     *    key match and if we do we project the record
+     * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+     *    from the build side that did not match any records on the probe side. For Left outer
+     *    case we handle it internally by projecting the record if there isn't a match on the build side
+     * 3. DONE: Once we have projected all possible records we are done
+     */
+    public static enum ProbeState {
+        PROBE_PROJECT, PROJECT_RIGHT, DONE
+    }
+
+    public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                            RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType);
+    public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+    public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+    public abstract void projectBuildRecord(int buildIndex, int outIndex);
+    public abstract void projectProbeRecord(int probeIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
new file mode 100644
index 0000000..cc1a257
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.sql2rel.StandardConvertletTable;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+
+    // Probe side record batch
+    private RecordBatch probeBatch;
+
+    // Join type, INNER, LEFT, RIGHT or OUTER
+    private JoinRelType joinType;
+
+    /* Helper class
+     * Maintains linked list of build side records with the same key
+     * Keeps information about which build records have a corresponding
+     * matching key in the probe side (for outer, right joins)
+     */
+    private HashJoinHelper hjHelper = null;
+
+    // Underlying hashtable used by the hash join
+    private HashTable hashTable = null;
+
+    // Number of records to process on the probe side
+    private int recordsToProcess = 0;
+
+    // Number of records processed on the probe side
+    private int recordsProcessed = 0;
+
+    // Number of records in the output container
+    private int outputRecords;
+
+    // Indicate if we should drain the next record from the probe side
+    private boolean getNextRecord = true;
+
+    // Contains both batch idx and record idx of the matching record in the build side
+    private int currentCompositeIdx = -1;
+
+    // Current state the hash join algorithm is in
+    private ProbeState probeState = ProbeState.PROBE_PROJECT;
+
+    // For outer or right joins, this is a list of unmatched records that needs to be projected
+    private List<Integer> unmatchedBuildIndexes = null;
+
+    @Override
+    public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                   RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+                                   JoinRelType joinRelType) {
+
+        this.probeBatch = probeBatch;
+        this.joinType = joinRelType;
+        this.recordsToProcess = probeBatch.getRecordCount();
+        this.hashTable = hashTable;
+        this.hjHelper = hjHelper;
+
+        doSetup(context, buildBatch, probeBatch, outgoing);
+    }
+
+    public void executeProjectRightPhase() {
+        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+            projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+        }
+    }
+
+    public void executeProbePhase() throws SchemaChangeException {
+        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+            // Check if we have processed all records in this batch we need to invoke next
+            if (recordsProcessed == recordsToProcess) {
+                IterOutcome leftUpstream = probeBatch.next();
+
+                switch (leftUpstream) {
+                    case NONE:
+                    case NOT_YET:
+                    case STOP:
+                        recordsProcessed = 0;
+                        recordsToProcess = 0;
+                        probeState = ProbeState.DONE;
+
+                        // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+                        if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+                            probeState = ProbeState.PROJECT_RIGHT;
+                        }
+
+                        continue;
+
+                    case OK_NEW_SCHEMA:
+                        throw new SchemaChangeException("Hash join does not support schema changes");
+                    case OK:
+                        recordsToProcess = probeBatch.getRecordCount();
+                        recordsProcessed = 0;
+                }
+            }
+            int probeIndex;
+
+            // Check if we need to drain the next row in the probe side
+            if (getNextRecord) {
+                probeIndex = hashTable.containsKey(recordsProcessed, true);
+
+                if (probeIndex != -1) {
+
+                    /* The current probe record has a key that matches. Get the index
+                     * of the first row in the build side that matches the current key
+                     */
+                    currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+                    /* Record in the build side at currentCompositeIdx has a matching record in the probe
+                     * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+                     * join we keep track of which records we need to project at the end
+                     */
+                    hjHelper.setRecordMatched(currentCompositeIdx);
+
+                    projectBuildRecord(currentCompositeIdx, outputRecords);
+                    projectProbeRecord(recordsProcessed, outputRecords);
+                    outputRecords++;
+
+                    /* Projected single row from the build side with matching key but there
+                     * may be more rows with the same key. Check if that's the case
+                     */
+                    currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+                    if (currentCompositeIdx == -1) {
+                        /* We only had one row in the build side that matched the current key
+                         * from the probe side. Drain the next row in the probe side.
+                         */
+                        recordsProcessed++;
+                    }
+                    else {
+                        /* There is more than one row with the same key on the build side
+                         * don't drain more records from the probe side till we have projected
+                         * all the rows with this key
+                         */
+                        getNextRecord = false;
+                    }
+                }
+                else { // No matching key
+
+                    // If we have a left outer join, project the keys
+                    if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+                        projectProbeRecord(recordsProcessed, outputRecords++);
+                    }
+                    recordsProcessed++;
+                }
+            }
+            else {
+                hjHelper.setRecordMatched(currentCompositeIdx);
+                projectBuildRecord(currentCompositeIdx, outputRecords);
+                projectProbeRecord(recordsProcessed, outputRecords);
+                outputRecords++;
+
+                currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+                if (currentCompositeIdx == -1) {
+                    // We don't have any more rows matching the current key on the build side, move on to the next probe row
+                    getNextRecord = true;
+                    recordsProcessed++;
+                }
+            }
+        }
+    }
+
+    public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+
+        outputRecords = 0;
+
+        if (probeState == ProbeState.PROBE_PROJECT) {
+            executeProbePhase();
+        }
+
+        if (probeState == ProbeState.PROJECT_RIGHT) {
+
+            // We are here because we have a RIGHT OUTER or a FULL join
+            if (unmatchedBuildIndexes == null) {
+                // Initialize list of build indexes that didn't match a record on the probe side
+                unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+                recordsToProcess = unmatchedBuildIndexes.size();
+                recordsProcessed = 0;
+            }
+
+            // Project the list of unmatched records on the build side
+            executeProjectRightPhase();
+        }
+
+        return outputRecords;
+    }
+
+    public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+                                 @Named("outgoing") RecordBatch outgoing);
+    public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+    public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
new file mode 100644
index 0000000..529563a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.codahale.metrics.MetricRegistry;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+
+public class TestHashJoin extends PopUnitTestBase{
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
+
+    DrillConfig c = DrillConfig.create();
+
+    private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable {
+        new NonStrictExpectations(){{
+            bitContext.getMetrics(); result = new MetricRegistry();
+            bitContext.getAllocator(); result = new TopLevelAllocator();
+            bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
+        }};
+
+        PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+        PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(physicalPlan), Charsets.UTF_8));
+        FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+        FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
+        SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+        int totalRecordCount = 0;
+        while (exec.next()) {
+            totalRecordCount += exec.getRecordCount();
+        }
+        assertEquals(expectedRows, totalRecordCount);
+        System.out.println("Total Record Count: " + totalRecordCount);
+        if (context.getFailureCause() != null)
+            throw context.getFailureCause();
+        assertTrue(!context.isFailed());
+    }
+
+    @Test
+    public void multiBatchEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                   @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+        testHJMockScanCommon(bitContext, connection, "/join/hash_join_multi_batch.json", 200000);
+    }
+
+    @Test
+    public void multiBatchRightOuterJoin(@Injectable final DrillbitContext bitContext,
+                                         @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+        testHJMockScanCommon(bitContext, connection, "/join/hj_right_outer_multi_batch.json", 100000);
+    }
+
+    @Test
+    public void multiBatchLeftOuterJoin(@Injectable final DrillbitContext bitContext,
+                                        @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+
+        testHJMockScanCommon(bitContext, connection, "/join/hj_left_outer_multi_batch.json", 100000);
+    }
+
+    @Test
+    public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                   @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+        // Function checks for casting from Float, Double to Decimal data types
+        try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+             Drillbit bit = new Drillbit(CONFIG, serviceSet);
+             DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+            // run query.
+            bit.run();
+            client.connect();
+            List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+                    Files.toString(FileUtils.getResourceAsFile("/join/hash_join.json"), Charsets.UTF_8)
+                            .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString())
+                            .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString()));
+
+            RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+
+            QueryResultBatch batch = results.get(0);
+            assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+            batchLoader.getValueAccessorById(0, IntVector.class);
+
+            Iterator<VectorWrapper<?>> itr = batchLoader.iterator();
+
+            // Just test the join key
+            long colA[] = {1, 1, 2, 2, 1, 1};
+
+            // Check the output of decimal9
+            ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor();
+
+
+            for (int i = 0; i < intAccessor1.getValueCount(); i++) {
+                assertEquals(intAccessor1.getObject(i), colA[i]);
+            }
+            assertEquals(6, intAccessor1.getValueCount());
+        }
+    }
+
+    @Test
+    public void multipleConditionJoin(@Injectable final DrillbitContext bitContext,
+                                      @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+        // Function checks for casting from Float, Double to Decimal data types
+        try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+             Drillbit bit = new Drillbit(CONFIG, serviceSet);
+             DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+            // run query.
+            bit.run();
+            client.connect();
+            List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+                    Files.toString(FileUtils.getResourceAsFile("/join/hj_multi_condition_join.json"), Charsets.UTF_8)
+                            .replace("#{TEST_FILE_1}", FileUtils.getResourceAsFile("/build_side_input.json").toURI().toString())
+                            .replace("#{TEST_FILE_2}", FileUtils.getResourceAsFile("/probe_side_input.json").toURI().toString()));
+
+            RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+
+            QueryResultBatch batch = results.get(0);
+            assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+            batchLoader.getValueAccessorById(0, IntVector.class);
+
+            Iterator<VectorWrapper<?>> itr = batchLoader.iterator();
+
+            // Just test the join key
+            long colA[] = {1, 2, 1};
+            long colC[] = {100, 200, 500};
+
+            // Check the output of decimal9
+            ValueVector.Accessor intAccessor1 = itr.next().getValueVector().getAccessor();
+            ValueVector.Accessor intAccessor2 = itr.next().getValueVector().getAccessor();
+
+
+            for (int i = 0; i < intAccessor1.getValueCount(); i++) {
+                assertEquals(intAccessor1.getObject(i), colA[i]);
+                assertEquals(intAccessor2.getObject(i), colC[i]);
+            }
+            assertEquals(3, intAccessor1.getValueCount());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index 78f7e43..e5cd508 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -45,7 +45,7 @@ public abstract class PopUnitTestBase {
   protected static DrillConfig CONFIG;
 
   // Set a timeout unless we're debugging.
-  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(25000);
+  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(500000);
 
   @BeforeClass
   public static void setup() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/build_side_input.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/build_side_input.json b/exec/java-exec/src/test/resources/build_side_input.json
new file mode 100644
index 0000000..31006a6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/build_side_input.json
@@ -0,0 +1,24 @@
+{
+"A": 1,
+"C": 100
+}
+{
+"A": 1,
+"C": 500
+}
+{
+"A": 2,
+"C": 200
+}
+{
+"A": 3,
+"C": 300
+}
+{
+"A": 4,
+"C": 400
+}
+{
+"A": 5,
+"C": 500
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hash_join.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hash_join.json b/exec/java-exec/src/test/resources/join/hash_join.json
new file mode 100644
index 0000000..60b915b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hash_join.json
@@ -0,0 +1,63 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  graph:[
+  {
+      @id:1,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "file:///"},
+      files:["#{TEST_FILE_1}"]
+  },
+  {
+      @id:2,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "file:///"},
+      files:["#{TEST_FILE_2}"]
+  },
+  {
+      "pop" : "project",
+      "@id" : 3,
+      "exprs" : [ {
+        "ref" : "output.A",
+        "expr" : "A"
+      },
+      { "ref" : "output.CCOL", "expr" : "C" }
+      ],
+
+      "child" : 1
+  },
+  {
+      "pop" : "project",
+      "@id" : 4,
+      "exprs" : [ {
+        "ref" : "output.B",
+        "expr" : "B"
+      },
+      { "ref" : "output.DCOL", "expr" : "D" }
+      ],
+
+      "child" : 2
+  },
+      {
+        @id: 5,
+        right: 3,
+        left: 4,
+        pop: "hash-join",
+        join-conditions: [ {relationship: "==", left: "B", right: "A"} ]
+      },
+      {
+        @id: 6,
+        child: 5,
+        pop: "screen"
+      }
+    ]
+  }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
new file mode 100644
index 0000000..7e218a4
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hash_join_multi_batch.json
@@ -0,0 +1,47 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org",
+      entries:[
+        {records: 4, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id:2,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org",
+      entries:[
+        {records: 100000, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id: 3,
+      right: 1,
+      left: 2,
+      pop: "hash-join",
+      join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
new file mode 100644
index 0000000..0cb5a03
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_left_outer_multi_batch.json
@@ -0,0 +1,48 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org",
+      entries:[
+        {records: 1, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id:2,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org",
+      entries:[
+        {records: 100000, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id: 3,
+      right: 1,
+      left: 2,
+      pop: "hash-join",
+      join-type: "LEFT",
+      join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
new file mode 100644
index 0000000..fd680d0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_multi_condition_join.json
@@ -0,0 +1,66 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  graph:[
+  {
+      @id:1,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "file:///"},
+      files:["#{TEST_FILE_1}"]
+  },
+  {
+      @id:2,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "file:///"},
+      files:["#{TEST_FILE_2}"]
+  },
+  {
+      "pop" : "project",
+      "@id" : 3,
+      "exprs" : [ {
+        "ref" : "output.A",
+        "expr" : "A"
+      },
+      { "ref" : "output.CCOL", "expr" : "C" }
+      ],
+
+      "child" : 1
+  },
+  {
+      "pop" : "project",
+      "@id" : 4,
+      "exprs" : [ {
+        "ref" : "output.B",
+        "expr" : "B"
+      },
+      { "ref" : "output.DCOL", "expr" : "D" }
+      ],
+
+      "child" : 2
+  },
+      {
+        @id: 5,
+        right: 3,
+        left: 4,
+        pop: "hash-join",
+        join-conditions: [
+        {relationship: "==", left: "B", right: "A"},
+        {relationship: "==", left: "DCOL", right: "CCOL"}
+        ]
+      },
+      {
+        @id: 6,
+        child: 5,
+        pop: "screen"
+      }
+    ]
+  }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
new file mode 100644
index 0000000..b8723aa
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/hj_right_outer_multi_batch.json
@@ -0,0 +1,48 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org",
+      entries:[
+        {records: 100000, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id:2,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org",
+      entries:[
+        {records: 1, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id: 3,
+      right: 1,
+      left: 2,
+      pop: "hash-join",
+      join-type: "RIGHT",
+      join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fc7b982/exec/java-exec/src/test/resources/probe_side_input.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/probe_side_input.json b/exec/java-exec/src/test/resources/probe_side_input.json
new file mode 100644
index 0000000..d3dbbb3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/probe_side_input.json
@@ -0,0 +1,28 @@
+{
+"B": 1,
+"D": 100
+}
+{
+"B": 2,
+"D": 200
+}
+{
+"B": 2,
+"D": 300
+}
+{
+"B": 9,
+"D": 900
+}
+{
+"B": 1,
+"D": 500
+}
+{
+"B": 10,
+"D": 1000
+}
+{
+"B": 11,
+"D": 1100
+}
\ No newline at end of file