You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:50 UTC

[02/14] git commit: Fix alignment in Hash Join code

Fix alignment in Hash Join code


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cb0d46f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cb0d46f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cb0d46f6

Branch: refs/heads/master
Commit: cb0d46f69df2d89601f56b257f3f6c1d97eedc6e
Parents: 1f4276e
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:52:03 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:52:03 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/HashJoinBatch.java  | 634 +++++++++----------
 .../impl/join/HashJoinBatchCreator.java         |  10 +-
 .../exec/physical/impl/join/HashJoinHelper.java | 278 ++++----
 .../exec/physical/impl/join/HashJoinProbe.java  |  38 +-
 .../impl/join/HashJoinProbeTemplate.java        | 310 ++++-----
 5 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..9afc033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -59,397 +59,397 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
-    // Probe side record batch
-    private final RecordBatch left;
+  // Probe side record batch
+  private final RecordBatch left;
 
-    // Build side record batch
-    private final RecordBatch right;
+  // Build side record batch
+  private final RecordBatch right;
 
-    // Join type, INNER, LEFT, RIGHT or OUTER
-    private final JoinRelType joinType;
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private final JoinRelType joinType;
 
-    // Join conditions
-    private final List<JoinCondition> conditions;
+  // Join conditions
+  private final List<JoinCondition> conditions;
 
-    // Runtime generated class implementing HashJoinProbe interface
-    private HashJoinProbe hashJoinProbe = null;
+  // Runtime generated class implementing HashJoinProbe interface
+  private HashJoinProbe hashJoinProbe = null;
 
-    /* Helper class
-     * Maintains linked list of build side records with the same key
-     * Keeps information about which build records have a corresponding
-     * matching key in the probe side (for outer, right joins)
-     */
-    private HashJoinHelper hjHelper = null;
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper = null;
 
-    // Underlying hashtable used by the hash join
-    private HashTable hashTable = null;
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable = null;
 
-    /* Hyper container to store all build side record batches.
-     * Records are retrieved from this container when there is a matching record
-     * on the probe side
-     */
-    private ExpandableHyperContainer hyperContainer;
+  /* Hyper container to store all build side record batches.
+   * Records are retrieved from this container when there is a matching record
+   * on the probe side
+   */
+  private ExpandableHyperContainer hyperContainer;
 
-    // Number of records in the output container
-    private int outputRecords;
+  // Number of records in the output container
+  private int outputRecords;
 
-    // Current batch index on the build side
-    private int buildBatchIndex = 0;
+  // Current batch index on the build side
+  private int buildBatchIndex = 0;
 
-    // List of vector allocators
-    private List<VectorAllocator> allocators = null;
+  // List of vector allocators
+  private List<VectorAllocator> allocators = null;
 
-    // Schema of the build side
-    private BatchSchema rightSchema = null;
+  // Schema of the build side
+  private BatchSchema rightSchema = null;
 
-    // Generator mapping for the build side
-    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
-                                                                                  "projectBuildRecord" /* eval method */,
-                                                                                  null /* reset */, null /* cleanup */);
+  // Generator mapping for the build side
+  private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+                                                                                "projectBuildRecord" /* eval method */,
+                                                                                null /* reset */, null /* cleanup */);
 
-    // Generator mapping for the probe side
-    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
-                                                                                  "projectProbeRecord" /* eval method */,
-                                                                                  null /* reset */, null /* cleanup */);
+  // Generator mapping for the probe side
+  private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+                                                                                "projectProbeRecord" /* eval method */,
+                                                                                null /* reset */, null /* cleanup */);
 
-    // Mapping set for the build side
-    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
-                                                                  "buildBatch" /* read container */,
-                                                                  "outgoing" /* write container */,
-                                                                  PROJECT_BUILD, PROJECT_BUILD);
+  // Mapping set for the build side
+  private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+                                                                "buildBatch" /* read container */,
+                                                                "outgoing" /* write container */,
+                                                                PROJECT_BUILD, PROJECT_BUILD);
 
-    // Mapping set for the probe side
-    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
-                                                                  "probeBatch" /* read container */,
-                                                                  "outgoing" /* write container */,
-                                                                  PROJECT_PROBE, PROJECT_PROBE);
+  // Mapping set for the probe side
+  private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+                                                                "probeBatch" /* read container */,
+                                                                "outgoing" /* write container */,
+                                                                PROJECT_PROBE, PROJECT_PROBE);
 
-    // indicates if we have previously returned an output batch
-    boolean firstOutputBatch = true;
+  // indicates if we have previously returned an output batch
+  boolean firstOutputBatch = true;
 
-    IterOutcome leftUpstream = IterOutcome.NONE;
-
-    @Override
-    public int getRecordCount() {
-        return outputRecords;
-    }
+  IterOutcome leftUpstream = IterOutcome.NONE;
 
+  @Override
+  public int getRecordCount() {
+    return outputRecords;
+  }
 
-    @Override
-    public IterOutcome next() {
 
-        try {
-            /* If we are here for the first time, execute the build phase of the
-             * hash join and setup the run time generated class for the probe side
-             */
-            if (hashJoinProbe == null) {
+  @Override
+  public IterOutcome next() {
 
-                // Initialize the hash join helper context
-                hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+    try {
+      /* If we are here for the first time, execute the build phase of the
+       * hash join and setup the run time generated class for the probe side
+       */
+      if (hashJoinProbe == null) {
 
-                /* Build phase requires setting up the hash table. Hash table will
-                 * materialize both the build and probe side expressions while
-                 * creating the hash table. So we need to invoke next() on our probe batch
-                 * as well, for the materialization to be successful. This batch will not be used
-                 * till we complete the build phase.
-                 */
-                leftUpstream = left.next();
+        // Initialize the hash join helper context
+        hjHelper = new HashJoinHelper(context, oContext.getAllocator());
 
-                // Build the hash table, using the build side record batches.
-                executeBuildPhase();
+        /* Build phase requires setting up the hash table. Hash table will
+         * materialize both the build and probe side expressions while
+         * creating the hash table. So we need to invoke next() on our probe batch
+         * as well, for the materialization to be successful. This batch will not be used
+         * till we complete the build phase.
+         */
+        leftUpstream = left.next();
 
-                // Create the run time generated code needed to probe and project
-                hashJoinProbe = setupHashJoinProbe();
-            }
+        // Build the hash table, using the build side record batches.
+        executeBuildPhase();
 
-            // Store the number of records projected
-            if (hashTable != null) {
-
-                // Allocate the memory for the vectors in the output container
-                allocateVectors();
-
-                outputRecords = hashJoinProbe.probeAndProject();
-
-                /* We are here because of one the following
-                 * 1. Completed processing of all the records and we are done
-                 * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
-                 * Either case build the output container's schema and return
-                 */
-                if (outputRecords > 0) {
-
-                  // Build the container schema and set the counts
-                  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-                  container.setRecordCount(outputRecords);
-
-                  for (VectorWrapper<?> v : container) {
-                    v.getValueVector().getMutator().setValueCount(outputRecords);
-                  }
-
-                  // First output batch, return OK_NEW_SCHEMA
-                  if (firstOutputBatch == true) {
-                    firstOutputBatch = false;
-                    return IterOutcome.OK_NEW_SCHEMA;
-                  }
-
-                  // Not the first output batch
-                  return IterOutcome.OK;
-                }
-            } else {
-                // Our build side is empty, we won't have any matches, clear the probe side
-                if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-                    for (VectorWrapper<?> wrapper : left) {
-                      wrapper.getValueVector().clear();
-                    }
-                    leftUpstream = left.next();
-                    while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-                      for (VectorWrapper<?> wrapper : left) {
-                        wrapper.getValueVector().clear();
-                      }
-                      leftUpstream = left.next();
-                    }
-                }
-            }
+        // Create the run time generated code needed to probe and project
+        hashJoinProbe = setupHashJoinProbe();
+      }
 
-            // No more output records, clean up and return
-            return IterOutcome.NONE;
+      // Store the number of records projected
+      if (hashTable != null) {
 
-        } catch (ClassTransformationException | SchemaChangeException | IOException e) {
-            context.fail(e);
-            killIncoming();
-            return IterOutcome.STOP;
-        }
-    }
+        // Allocate the memory for the vectors in the output container
+        allocateVectors();
 
-    public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+        outputRecords = hashJoinProbe.probeAndProject();
 
-        // Setup the hash table configuration object
-        int conditionsSize = conditions.size();
+        /* We are here because of one the following
+         * 1. Completed processing of all the records and we are done
+         * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+         * Either case build the output container's schema and return
+         */
+        if (outputRecords > 0) {
 
-        NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
-        NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+        // Build the container schema and set the counts
+        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        container.setRecordCount(outputRecords);
 
-        // Create named expressions from the conditions
-        for (int i = 0; i < conditionsSize; i++) {
-            rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
-            leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+        for (VectorWrapper<?> v : container) {
+          v.getValueVector().getMutator().setValueCount(outputRecords);
+        }
 
-            // Hash join only supports equality currently.
-            assert conditions.get(i).getRelationship().equals("==");
+        // First output batch, return OK_NEW_SCHEMA
+        if (firstOutputBatch == true) {
+          firstOutputBatch = false;
+          return IterOutcome.OK_NEW_SCHEMA;
         }
 
-        // Set the left named expression to be null if the probe batch is empty.
-        if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
-            leftExpr = null;
-        } else {
-          if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-            throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+          // Not the first output batch
+          return IterOutcome.OK;
+        }
+      } else {
+        // Our build side is empty, we won't have any matches, clear the probe side
+        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+          for (VectorWrapper<?> wrapper : left) {
+            wrapper.getValueVector().clear();
+          }
+          leftUpstream = left.next();
+          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+            for (VectorWrapper<?> wrapper : left) {
+              wrapper.getValueVector().clear();
+            }
+            leftUpstream = left.next();
           }
         }
+      }
 
-        HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+      // No more output records, clean up and return
+      return IterOutcome.NONE;
 
-        // Create the chained hash table
-        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
-        hashTable = ht.createAndSetupHashTable(null);
+    } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+      context.fail(e);
+      killIncoming();
+      return IterOutcome.STOP;
     }
+  }
 
-    public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
-        //Setup the underlying hash table
-        IterOutcome rightUpstream = right.next();
-
-        boolean moreData = true;
-
-        while (moreData) {
-
-            switch (rightUpstream) {
-
-                case NONE:
-                case NOT_YET:
-                case STOP:
-                    moreData = false;
-                    continue;
-
-                case OK_NEW_SCHEMA:
-                    if (rightSchema == null) {
-                        rightSchema = right.getSchema();
-
-                        if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-                          throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
-                        }
-                        setupHashTable();
-                    } else {
-                        throw new SchemaChangeException("Hash join does not support schema changes");
-                    }
-                // Fall through
-                case OK:
-                    int currentRecordCount = right.getRecordCount();
-
-                    /* For every new build batch, we store some state in the helper context
-                     * Add new state to the helper context
-                     */
-                    hjHelper.addNewBatch(currentRecordCount);
-
-                    // Holder contains the global index where the key is hashed into using the hash table
-                    IntHolder htIndex = new IntHolder();
-
-                    // For every record in the build batch , hash the key columns
-                    for (int i = 0; i < currentRecordCount; i++) {
-
-                        HashTable.PutStatus status = hashTable.put(i, htIndex);
-
-                        if (status != HashTable.PutStatus.PUT_FAILED) {
-                            /* Use the global index returned by the hash table, to store
-                             * the current record index and batch index. This will be used
-                             * later when we probe and find a match.
-                             */
-                            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-                        }
-                    }
-
-                    /* Completed hashing all records in this batch. Transfer the batch
-                     * to the hyper vector container. Will be used when we want to retrieve
-                     * records that have matching keys on the probe side.
-                     */
-                    RecordBatchData nextBatch = new RecordBatchData(right);
-                    if (hyperContainer == null) {
-                        hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
-                    } else {
-                        hyperContainer.addBatch(nextBatch.getContainer());
-                    }
-
-                    // completed processing a batch, increment batch index
-                    buildBatchIndex++;
-                    break;
-            }
-            // Get the next record batch
-            rightUpstream = right.next();
-        }
+  public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+    // Setup the hash table configuration object
+    int conditionsSize = conditions.size();
+
+    NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+    NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+    // Create named expressions from the conditions
+    for (int i = 0; i < conditionsSize; i++) {
+      rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+      leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+      // Hash join only supports equality currently.
+      assert conditions.get(i).getRelationship().equals("==");
     }
 
-    public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+    // Set the left named expression to be null if the probe batch is empty.
+    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
+      leftExpr = null;
+    } else {
+      if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+        throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+      }
+    }
 
-        allocators = new ArrayList<>();
+    HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
-        final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-        ClassGenerator<HashJoinProbe> g = cg.getRoot();
+    // Create the chained hash table
+    ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
+    hashTable = ht.createAndSetupHashTable(null);
+  }
 
-        // Generate the code to project build side records
-        g.setMappingSet(projectBuildMapping);
+  public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
 
+    //Setup the underlying hash table
+    IterOutcome rightUpstream = right.next();
 
-        int fieldId = 0;
-        JExpression buildIndex = JExpr.direct("buildIndex");
-        JExpression outIndex = JExpr.direct("outIndex");
-        g.rotateBlock();
+    boolean moreData = true;
 
-        if (hyperContainer != null) {
-            for(VectorWrapper<?> vv : hyperContainer) {
+    while (moreData) {
 
-                MajorType inputType = vv.getField().getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+      switch (rightUpstream) {
 
-                // Add the vector to our output container
-                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
-                container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          moreData = false;
+          continue;
 
-                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
-                  .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-                  .arg(outIndex)
-                  .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+        case OK_NEW_SCHEMA:
+          if (rightSchema == null) {
+            rightSchema = right.getSchema();
 
-                fieldId++;
+            if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+              throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
             }
-        }
-        g.rotateBlock();
-        g.getEvalBlock()._return(JExpr.TRUE);
+            setupHashTable();
+          } else {
+            throw new SchemaChangeException("Hash join does not support schema changes");
+          }
+        // Fall through
+        case OK:
+          int currentRecordCount = right.getRecordCount();
 
-        // Generate the code to project probe side records
-        g.setMappingSet(projectProbeMapping);
+          /* For every new build batch, we store some state in the helper context
+           * Add new state to the helper context
+           */
+          hjHelper.addNewBatch(currentRecordCount);
 
-        int outputFieldId = fieldId;
-        fieldId = 0;
-        JExpression probeIndex = JExpr.direct("probeIndex");
-        int recordCount = 0;
+          // Holder contains the global index where the key is hashed into using the hash table
+          IntHolder htIndex = new IntHolder();
 
-        if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
-            for (VectorWrapper<?> vv : left) {
+          // For every record in the build batch , hash the key columns
+          for (int i = 0; i < currentRecordCount; i++) {
 
-                MajorType inputType = vv.getField().getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+            HashTable.PutStatus status = hashTable.put(i, htIndex);
 
-                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
-                container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
+            if (status != HashTable.PutStatus.PUT_FAILED) {
+              /* Use the global index returned by the hash table, to store
+               * the current record index and batch index. This will be used
+               * later when we probe and find a match.
+               */
+              hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+            }
+          }
 
-                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+          /* Completed hashing all records in this batch. Transfer the batch
+           * to the hyper vector container. Will be used when we want to retrieve
+           * records that have matching keys on the probe side.
+           */
+          RecordBatchData nextBatch = new RecordBatchData(right);
+          if (hyperContainer == null) {
+            hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+          } else {
+            hyperContainer.addBatch(nextBatch.getContainer());
+          }
 
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+          // completed processing a batch, increment batch index
+          buildBatchIndex++;
+          break;
+      }
+      // Get the next record batch
+      rightUpstream = right.next();
+    }
+  }
 
-                fieldId++;
-                outputFieldId++;
-            }
-            g.rotateBlock();
-            g.getEvalBlock()._return(JExpr.TRUE);
+  public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+    allocators = new ArrayList<>();
+
+    final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+    // Generate the code to project build side records
+    g.setMappingSet(projectBuildMapping);
+
+
+    int fieldId = 0;
+    JExpression buildIndex = JExpr.direct("buildIndex");
+    JExpression outIndex = JExpr.direct("outIndex");
+    g.rotateBlock();
 
-            recordCount = left.getRecordCount();
+    if (hyperContainer != null) {
+      for(VectorWrapper<?> vv : hyperContainer) {
+
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
         }
 
+        // Add the vector to our output container
+        ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
+        container.add(v);
+        allocators.add(RemovingRecordBatch.getAllocator4(v));
 
-        HashJoinProbe hj = context.getImplementationClass(cg);
+        JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
+          .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+          .arg(outIndex)
+          .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
 
-        hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
-        return hj;
+        fieldId++;
+      }
     }
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.TRUE);
+
+    // Generate the code to project probe side records
+    g.setMappingSet(projectProbeMapping);
 
-    private void allocateVectors(){
-        for(VectorAllocator a : allocators){
-            a.alloc(RecordBatch.MAX_BATCH_SIZE);
+    int outputFieldId = fieldId;
+    fieldId = 0;
+    JExpression probeIndex = JExpr.direct("probeIndex");
+    int recordCount = 0;
+
+    if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      for (VectorWrapper<?> vv : left) {
+
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
         }
-    }
 
-    public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-        super(popConfig, context);
-        this.left = left;
-        this.right = right;
-        this.joinType = popConfig.getJoinType();
-        this.conditions = popConfig.getConditions();
-    }
+        ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
+        container.add(v);
+        allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+        JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
-    @Override
-    public void killIncoming() {
-        this.left.kill();
-        this.right.kill();
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+
+        fieldId++;
+        outputFieldId++;
+      }
+      g.rotateBlock();
+      g.getEvalBlock()._return(JExpr.TRUE);
+
+      recordCount = left.getRecordCount();
     }
 
-    @Override
-    public void cleanup() {
-        hjHelper.clear();
 
-        // If we didn't receive any data, hyperContainer may be null, check before clearing
-        if (hyperContainer != null) {
-            hyperContainer.clear();
-        }
+    HashJoinProbe hj = context.getImplementationClass(cg);
 
-        if (hashTable != null) {
-            hashTable.clear();
-        }
-        super.cleanup();
-        left.cleanup();
-        right.cleanup();
+    hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
+    return hj;
+  }
+
+  private void allocateVectors(){
+    for(VectorAllocator a : allocators){
+      a.alloc(RecordBatch.MAX_BATCH_SIZE);
+    }
+  }
+
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context);
+    this.left = left;
+    this.right = right;
+    this.joinType = popConfig.getJoinType();
+    this.conditions = popConfig.getConditions();
+  }
+
+  @Override
+  public void killIncoming() {
+    this.left.kill();
+    this.right.kill();
+  }
+
+  @Override
+  public void cleanup() {
+    hjHelper.clear();
+
+    // If we didn't receive any data, hyperContainer may be null, check before clearing
+    if (hyperContainer != null) {
+      hyperContainer.clear();
+    }
+
+    if (hashTable != null) {
+      hashTable.clear();
     }
+    super.cleanup();
+    left.cleanup();
+    right.cleanup();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 19a4a29..d925958 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -29,9 +29,9 @@ import java.util.List;
 
 public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
 
-    @Override
-    public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
-        Preconditions.checkArgument(children.size() == 2);
-        return new HashJoinBatch(config, context, children.get(0), children.get(1));
-    }
+  @Override
+  public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 2);
+    return new HashJoinBatch(config, context, children.get(0), children.get(1));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index b1ed07e..a634827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -51,183 +51,183 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
  */
 public class HashJoinHelper {
 
-    /* List of start indexes. Stores the record and batch index of the first record
-     * with a give key.
-     */
-    List<SelectionVector4> startIndices = new ArrayList<>();
-
-    // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
-    List<BuildInfo> buildInfoList = new ArrayList<>();
+  /* List of start indexes. Stores the record and batch index of the first record
+   * with a give key.
+   */
+  List<SelectionVector4> startIndices = new ArrayList<>();
 
-    // Fragment context
-    FragmentContext context;
-    BufferAllocator allocator;
+  // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+  List<BuildInfo> buildInfoList = new ArrayList<>();
 
-    // Constant to indicate index is empty.
-    static final int INDEX_EMPTY = -1;
+  // Fragment context
+  FragmentContext context;
+  BufferAllocator allocator;
 
-    // bits to shift while obtaining batch index from SV4
-    static final int SHIFT_SIZE = 16;
+  // Constant to indicate index is empty.
+  static final int INDEX_EMPTY = -1;
 
-    public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
-        this.context = context;
-        this.allocator = allocator;
-    }
+  // bits to shift while obtaining batch index from SV4
+  static final int SHIFT_SIZE = 16;
 
-    public void addStartIndexBatch() throws SchemaChangeException {
-        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
-    }
+  public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
+    this.context = context;
+    this.allocator = allocator;
+}
 
-    public class BuildInfo {
-        // List of links. Logically it helps maintain a linked list of records with the same key value
-        private SelectionVector4 links;
+  public void addStartIndexBatch() throws SchemaChangeException {
+    startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+  }
 
-        // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
-        private BitSet keyMatchBitVector;
+  public class BuildInfo {
+    // List of links. Logically it helps maintain a linked list of records with the same key value
+    private SelectionVector4 links;
 
-        // number of records in this batch
-        private int recordCount;
+    // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+    private BitSet keyMatchBitVector;
 
-        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
-            this.links = links;
-            this.keyMatchBitVector = keyMatchBitVector;
-            this.recordCount = recordCount;
-        }
+    // number of records in this batch
+    private int recordCount;
 
-        public SelectionVector4 getLinks() {
-            return links;
-        }
+    public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+      this.links = links;
+      this.keyMatchBitVector = keyMatchBitVector;
+      this.recordCount = recordCount;
+    }
 
-        public BitSet getKeyMatchBitVector() {
-            return keyMatchBitVector;
-        }
+    public SelectionVector4 getLinks() {
+      return links;
     }
 
-    public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+    public BitSet getKeyMatchBitVector() {
+      return keyMatchBitVector;
+    }
+  }
 
-        ByteBuf vector = allocator.buffer((recordCount * 4));
+  public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
 
-        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+    ByteBuf vector = allocator.buffer((recordCount * 4));
 
-        // Initialize the vector
-        for (int i = 0; i < recordCount; i++) {
-            sv4.set(i, INDEX_EMPTY);
-        }
+    SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
 
-        return sv4;
+    // Initialize the vector
+    for (int i = 0; i < recordCount; i++) {
+      sv4.set(i, INDEX_EMPTY);
     }
 
-    public void addNewBatch(int recordCount) throws SchemaChangeException {
-        // Add a node to the list of BuildInfo's
-        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
-        buildInfoList.add(info);
-    }
+    return sv4;
+  }
 
-    public int getStartIndex(int keyIndex) {
-        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
-        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+  public void addNewBatch(int recordCount) throws SchemaChangeException {
+    // Add a node to the list of BuildInfo's
+    BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+    buildInfoList.add(info);
+  }
 
-        assert batchIdx < startIndices.size();
+  public int getStartIndex(int keyIndex) {
+    int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+    int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
 
-        SelectionVector4 sv4 = startIndices.get(batchIdx);
+    assert batchIdx < startIndices.size();
 
-        return sv4.get(offsetIdx);
-    }
+    SelectionVector4 sv4 = startIndices.get(batchIdx);
 
-    public int getNextIndex(int currentIdx) {
-        // Get to the links field of the current index to get the next index
-        int batchIdx = currentIdx >>> SHIFT_SIZE;
-        int recordIdx = currentIdx & HashTable.BATCH_MASK;
+    return sv4.get(offsetIdx);
+  }
 
-        assert batchIdx < buildInfoList.size();
+  public int getNextIndex(int currentIdx) {
+    // Get to the links field of the current index to get the next index
+    int batchIdx = currentIdx >>> SHIFT_SIZE;
+    int recordIdx = currentIdx & HashTable.BATCH_MASK;
 
-        // Get the corresponding BuildInfo node
-        BuildInfo info = buildInfoList.get(batchIdx);
-        return info.getLinks().get(recordIdx);
-    }
+    assert batchIdx < buildInfoList.size();
 
-    public List<Integer> getNextUnmatchedIndex() {
-        List<Integer> compositeIndexes = new ArrayList<>();
+    // Get the corresponding BuildInfo node
+    BuildInfo info = buildInfoList.get(batchIdx);
+    return info.getLinks().get(recordIdx);
+  }
 
-        for (int i = 0; i < buildInfoList.size(); i++) {
-            BuildInfo info = buildInfoList.get(i);
-            int fromIndex = 0;
+  public List<Integer> getNextUnmatchedIndex() {
+    List<Integer> compositeIndexes = new ArrayList<>();
 
-            while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
-                compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
-                fromIndex++;
-            }
-        }
-        return compositeIndexes;
+    for (int i = 0; i < buildInfoList.size(); i++) {
+      BuildInfo info = buildInfoList.get(i);
+      int fromIndex = 0;
+
+      while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+          compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+          fromIndex++;
+      }
     }
+    return compositeIndexes;
+  }
 
-    public void setRecordMatched(int index) {
-        int batchIdx  = index >>> SHIFT_SIZE;
-        int recordIdx = index & HashTable.BATCH_MASK;
+  public void setRecordMatched(int index) {
+    int batchIdx  = index >>> SHIFT_SIZE;
+    int recordIdx = index & HashTable.BATCH_MASK;
 
-        // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
-        BuildInfo info = buildInfoList.get(batchIdx);
-        BitSet bitVector = info.getKeyMatchBitVector();
+    // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+    BuildInfo info = buildInfoList.get(batchIdx);
+    BitSet bitVector = info.getKeyMatchBitVector();
 
-        bitVector.set(recordIdx);
-    }
+    bitVector.set(recordIdx);
+  }
 
-    public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+  public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
 
-        /* set the current record batch index and the index
-         * within the batch at the specified keyIndex. The keyIndex
-         * denotes the global index where the key for this record is
-         * stored in the hash table
+    /* set the current record batch index and the index
+     * within the batch at the specified keyIndex. The keyIndex
+     * denotes the global index where the key for this record is
+     * stored in the hash table
+     */
+    int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+    int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+    if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+        // allocate a new batch
+      addStartIndexBatch();
+    }
+
+    SelectionVector4 startIndex = startIndices.get(batchIdx);
+    int linkIndex;
+
+    // If head of the list is empty, insert current index at this position
+    if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+      startIndex.set(offsetIdx, batchIndex, recordIndex);
+    } else {
+      /* Head of this list is not empty, if the first link
+       * is empty insert there
+       */
+      batchIdx = linkIndex >>> SHIFT_SIZE;
+      offsetIdx = linkIndex & Character.MAX_VALUE;
+
+      SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
+      int firstLink = link.get(offsetIdx);
+
+      if (firstLink == INDEX_EMPTY) {
+        link.set(offsetIdx, batchIndex, recordIndex);
+      } else {
+        /* Insert the current value as the first link and
+         * make the current first link as its next
          */
-        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
-        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
-
-        if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
-            // allocate a new batch
-            addStartIndexBatch();
-        }
-
-        SelectionVector4 startIndex = startIndices.get(batchIdx);
-        int linkIndex;
-
-        // If head of the list is empty, insert current index at this position
-        if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
-            startIndex.set(offsetIdx, batchIndex, recordIndex);
-        } else {
-            /* Head of this list is not empty, if the first link
-             * is empty insert there
-             */
-            batchIdx = linkIndex >>> SHIFT_SIZE;
-            offsetIdx = linkIndex & Character.MAX_VALUE;
-
-            SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
-            int firstLink = link.get(offsetIdx);
-
-            if (firstLink == INDEX_EMPTY) {
-                link.set(offsetIdx, batchIndex, recordIndex);
-            } else {
-                /* Insert the current value as the first link and
-                 * make the current first link as its next
-                 */
-                int firstLinkBatchIdx  = firstLink >>> SHIFT_SIZE;
-                int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
-
-                SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
-                nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
-
-                link.set(offsetIdx, batchIndex, recordIndex);
-            }
-        }
+        int firstLinkBatchIdx  = firstLink >>> SHIFT_SIZE;
+        int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
+
+        SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
+        nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
+
+        link.set(offsetIdx, batchIndex, recordIndex);
+      }
     }
+  }
 
-    public void clear() {
-        // Clear the SV4 used for start indices
-        for (SelectionVector4 sv4: startIndices) {
-            sv4.clear();
-        }
+  public void clear() {
+    // Clear the SV4 used for start indices
+    for (SelectionVector4 sv4: startIndices) {
+      sv4.clear();
+    }
 
-        for (BuildInfo info : buildInfoList) {
-            info.getLinks().clear();
-        }
+    for (BuildInfo info : buildInfoList) {
+      info.getLinks().clear();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 160d352..6d20f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -33,25 +33,25 @@ import org.eigenbase.rel.JoinRelType;
 import java.io.IOException;
 
 public interface HashJoinProbe {
-    public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+  public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
 
-    /* The probe side of the hash join can be in the following two states
-     * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
-     *    key match and if we do we project the record
-     * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
-     *    from the build side that did not match any records on the probe side. For Left outer
-     *    case we handle it internally by projecting the record if there isn't a match on the build side
-     * 3. DONE: Once we have projected all possible records we are done
-     */
-    public static enum ProbeState {
-        PROBE_PROJECT, PROJECT_RIGHT, DONE
-    }
+  /* The probe side of the hash join can be in the following two states
+   * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+   *    key match and if we do we project the record
+   * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+   *    from the build side that did not match any records on the probe side. For Left outer
+   *    case we handle it internally by projecting the record if there isn't a match on the build side
+   * 3. DONE: Once we have projected all possible records we are done
+   */
+  public static enum ProbeState {
+    PROBE_PROJECT, PROJECT_RIGHT, DONE
+  }
 
-    public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
-                                            int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
-                                            JoinRelType joinRelType);
-    public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
-    public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
-    public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
-    public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+  public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                          int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+                                          JoinRelType joinRelType);
+  public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+  public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+  public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
+  public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index a3e3b74..2a8be54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,196 +35,196 @@ import java.util.List;
 
 public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
-    // Probe side record batch
-    private RecordBatch probeBatch;
+  // Probe side record batch
+  private RecordBatch probeBatch;
 
-    // Join type, INNER, LEFT, RIGHT or OUTER
-    private JoinRelType joinType;
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private JoinRelType joinType;
 
-    /* Helper class
-     * Maintains linked list of build side records with the same key
-     * Keeps information about which build records have a corresponding
-     * matching key in the probe side (for outer, right joins)
-     */
-    private HashJoinHelper hjHelper = null;
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper = null;
 
-    // Underlying hashtable used by the hash join
-    private HashTable hashTable = null;
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable = null;
 
-    // Number of records to process on the probe side
-    private int recordsToProcess = 0;
+  // Number of records to process on the probe side
+  private int recordsToProcess = 0;
 
-    // Number of records processed on the probe side
-    private int recordsProcessed = 0;
+  // Number of records processed on the probe side
+  private int recordsProcessed = 0;
 
-    // Number of records in the output container
-    private int outputRecords;
+  // Number of records in the output container
+  private int outputRecords;
 
-    // Indicate if we should drain the next record from the probe side
-    private boolean getNextRecord = true;
+  // Indicate if we should drain the next record from the probe side
+  private boolean getNextRecord = true;
 
-    // Contains both batch idx and record idx of the matching record in the build side
-    private int currentCompositeIdx = -1;
+  // Contains both batch idx and record idx of the matching record in the build side
+  private int currentCompositeIdx = -1;
 
-    // Current state the hash join algorithm is in
-    private ProbeState probeState = ProbeState.PROBE_PROJECT;
+  // Current state the hash join algorithm is in
+  private ProbeState probeState = ProbeState.PROBE_PROJECT;
 
-    // For outer or right joins, this is a list of unmatched records that needs to be projected
-    private List<Integer> unmatchedBuildIndexes = null;
+  // For outer or right joins, this is a list of unmatched records that needs to be projected
+  private List<Integer> unmatchedBuildIndexes = null;
 
-    @Override
-    public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
-                                   int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
-                                   HashJoinHelper hjHelper, JoinRelType joinRelType) {
+  @Override
+  public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                 int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
+                                 HashJoinHelper hjHelper, JoinRelType joinRelType) {
 
-        this.probeBatch = probeBatch;
-        this.joinType = joinRelType;
-        this.recordsToProcess = probeRecordCount;
-        this.hashTable = hashTable;
-        this.hjHelper = hjHelper;
+    this.probeBatch = probeBatch;
+    this.joinType = joinRelType;
+    this.recordsToProcess = probeRecordCount;
+    this.hashTable = hashTable;
+    this.hjHelper = hjHelper;
 
-        doSetup(context, buildBatch, probeBatch, outgoing);
+    doSetup(context, buildBatch, probeBatch, outgoing);
+  }
+
+  public void executeProjectRightPhase() {
+    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+      boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+      assert success;
     }
+  }
 
-    public void executeProjectRightPhase() {
-        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
-            boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
-            assert success;
+  public void executeProbePhase() throws SchemaChangeException {
+    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+      // Check if we have processed all records in this batch we need to invoke next
+      if (recordsProcessed == recordsToProcess) {
+
+        // Done processing all records in the previous batch, clean up!
+        for (VectorWrapper<?> wrapper : probeBatch) {
+          wrapper.getValueVector().clear();
         }
-    }
 
-    public void executeProbePhase() throws SchemaChangeException {
-        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+        IterOutcome leftUpstream = probeBatch.next();
 
-            // Check if we have processed all records in this batch we need to invoke next
-            if (recordsProcessed == recordsToProcess) {
+        switch (leftUpstream) {
+          case NONE:
+          case NOT_YET:
+          case STOP:
+            recordsProcessed = 0;
+            recordsToProcess = 0;
+            probeState = ProbeState.DONE;
 
-                // Done processing all records in the previous batch, clean up!
-                for (VectorWrapper<?> wrapper : probeBatch) {
-                    wrapper.getValueVector().clear();
-                }
+            // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+            if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+                probeState = ProbeState.PROJECT_RIGHT;
+            }
 
-                IterOutcome leftUpstream = probeBatch.next();
+            continue;
 
-                switch (leftUpstream) {
-                    case NONE:
-                    case NOT_YET:
-                    case STOP:
-                        recordsProcessed = 0;
-                        recordsToProcess = 0;
-                        probeState = ProbeState.DONE;
+          case OK_NEW_SCHEMA:
+            throw new SchemaChangeException("Hash join does not support schema changes");
+          case OK:
+            recordsToProcess = probeBatch.getRecordCount();
+            recordsProcessed = 0;
+        }
+      }
+      int probeIndex;
 
-                        // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
-                        if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
-                            probeState = ProbeState.PROJECT_RIGHT;
-                        }
+      // Check if we need to drain the next row in the probe side
+      if (getNextRecord) {
+          probeIndex = hashTable.containsKey(recordsProcessed, true);
 
-                        continue;
+          if (probeIndex != -1) {
 
-                    case OK_NEW_SCHEMA:
-                        throw new SchemaChangeException("Hash join does not support schema changes");
-                    case OK:
-                        recordsToProcess = probeBatch.getRecordCount();
-                        recordsProcessed = 0;
-                }
-            }
-            int probeIndex;
-
-            // Check if we need to drain the next row in the probe side
-            if (getNextRecord) {
-                probeIndex = hashTable.containsKey(recordsProcessed, true);
-
-                if (probeIndex != -1) {
-
-                    /* The current probe record has a key that matches. Get the index
-                     * of the first row in the build side that matches the current key
-                     */
-                    currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
-                    /* Record in the build side at currentCompositeIdx has a matching record in the probe
-                     * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
-                     * join we keep track of which records we need to project at the end
-                     */
-                    hjHelper.setRecordMatched(currentCompositeIdx);
-
-                    boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
-                    assert success;
-                    success = projectProbeRecord(recordsProcessed, outputRecords);
-                    assert success;
-                    outputRecords++;
-
-                    /* Projected single row from the build side with matching key but there
-                     * may be more rows with the same key. Check if that's the case
-                     */
-                    currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-                    if (currentCompositeIdx == -1) {
-                        /* We only had one row in the build side that matched the current key
-                         * from the probe side. Drain the next row in the probe side.
-                         */
-                        recordsProcessed++;
-                    }
-                    else {
-                        /* There is more than one row with the same key on the build side
-                         * don't drain more records from the probe side till we have projected
-                         * all the rows with this key
-                         */
-                        getNextRecord = false;
-                    }
-                }
-                else { // No matching key
-
-                    // If we have a left outer join, project the keys
-                    if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-                        projectProbeRecord(recordsProcessed, outputRecords++);
-                    }
-                    recordsProcessed++;
-                }
+            /* The current probe record has a key that matches. Get the index
+             * of the first row in the build side that matches the current key
+             */
+            currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+            /* Record in the build side at currentCompositeIdx has a matching record in the probe
+             * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+             * join we keep track of which records we need to project at the end
+             */
+            hjHelper.setRecordMatched(currentCompositeIdx);
+
+            boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+            assert success;
+            success = projectProbeRecord(recordsProcessed, outputRecords);
+            assert success;
+            outputRecords++;
+
+            /* Projected single row from the build side with matching key but there
+             * may be more rows with the same key. Check if that's the case
+             */
+            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+            if (currentCompositeIdx == -1) {
+              /* We only had one row in the build side that matched the current key
+               * from the probe side. Drain the next row in the probe side.
+               */
+              recordsProcessed++;
             }
             else {
-                hjHelper.setRecordMatched(currentCompositeIdx);
-                boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
-                assert success;
-                projectProbeRecord(recordsProcessed, outputRecords);
-                outputRecords++;
-
-                currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
-                if (currentCompositeIdx == -1) {
-                    // We don't have any more rows matching the current key on the build side, move on to the next probe row
-                    getNextRecord = true;
-                    recordsProcessed++;
-                }
+              /* There is more than one row with the same key on the build side
+               * don't drain more records from the probe side till we have projected
+               * all the rows with this key
+               */
+              getNextRecord = false;
             }
         }
-    }
+          else { // No matching key
 
-    public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+            // If we have a left outer join, project the keys
+            if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+              projectProbeRecord(recordsProcessed, outputRecords++);
+            }
+            recordsProcessed++;
+          }
+      }
+      else {
+        hjHelper.setRecordMatched(currentCompositeIdx);
+        boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+        assert success;
+        projectProbeRecord(recordsProcessed, outputRecords);
+        outputRecords++;
+
+        currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+        if (currentCompositeIdx == -1) {
+          // We don't have any more rows matching the current key on the build side, move on to the next probe row
+          getNextRecord = true;
+          recordsProcessed++;
+        }
+      }
+    }
+  }
 
-        outputRecords = 0;
+  public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
 
-        if (probeState == ProbeState.PROBE_PROJECT) {
-            executeProbePhase();
-        }
+    outputRecords = 0;
 
-        if (probeState == ProbeState.PROJECT_RIGHT) {
+    if (probeState == ProbeState.PROBE_PROJECT) {
+      executeProbePhase();
+    }
 
-            // We are here because we have a RIGHT OUTER or a FULL join
-            if (unmatchedBuildIndexes == null) {
-                // Initialize list of build indexes that didn't match a record on the probe side
-                unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
-                recordsToProcess = unmatchedBuildIndexes.size();
-                recordsProcessed = 0;
-            }
+    if (probeState == ProbeState.PROJECT_RIGHT) {
 
-            // Project the list of unmatched records on the build side
-            executeProjectRightPhase();
-        }
+      // We are here because we have a RIGHT OUTER or a FULL join
+      if (unmatchedBuildIndexes == null) {
+        // Initialize list of build indexes that didn't match a record on the probe side
+        unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+        recordsToProcess = unmatchedBuildIndexes.size();
+        recordsProcessed = 0;
+      }
 
-        return outputRecords;
+      // Project the list of unmatched records on the build side
+      executeProjectRightPhase();
     }
 
-    public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
-                                 @Named("outgoing") RecordBatch outgoing);
-    public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
-    public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+    return outputRecords;
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+                               @Named("outgoing") RecordBatch outgoing);
+  public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+  public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
 }