You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/29 05:43:36 UTC

[3/7] git commit: DRILL-190 (part2) - MergeJoinBatch handles record batches - JoinStatus tracks state across input and output batches - MergeJoinBatchBuilder builds a selection vector of right-side batches which may be rescanned - implement code stub

DRILL-190 (part2)
 - MergeJoinBatch handles record batches
 - JoinStatus tracks state across input and output batches
 - MergeJoinBatchBuilder builds a selection vector of right-side batches which may be rescanned
 - implement code stubs for merge join
 - add field expression parsing and start of generated merge join code
 - code generator support for merge-join's copyLeft(), copyRight(), compare() and compareNextLeftKey()
 - add line prefixes to generated code log
 - support VectorContainers in declareVectorValueSetupAndMember()
 - fix vector allocation in MergeJoinBatch
 - fix missing values from left batch when right batch has been exhausted
 - fix nullable handling in generated merge join code.  make simple merge join test use multiple batches.
 - fixes for sv4 batch support, additional multi batch test


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0bac2f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0bac2f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0bac2f0

Branch: refs/heads/master
Commit: e0bac2f0064be181fd03c18e1bfb243492cd1792
Parents: 8ceee5d
Author: Ben Becker <be...@gmail.com>
Authored: Thu Aug 15 21:16:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:36:45 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/compile/JaninoClassCompiler.java |  23 +-
 .../exec/compile/sig/GeneratorMapping.java      |   5 +
 .../physical/base/AbstractPhysicalVisitor.java  |   4 +-
 .../drill/exec/physical/base/GroupScan.java     |   4 +
 .../exec/physical/base/PhysicalVisitor.java     |   4 +-
 .../drill/exec/physical/base/SubScan.java       |   4 +
 .../exec/physical/config/MergeJoinPOP.java      |   4 +
 .../drill/exec/physical/impl/ImplCreator.java   |   9 +
 .../exec/physical/impl/join/JoinEvaluator.java  |   9 +-
 .../physical/impl/join/JoinInnerSignature.java  |  35 ++
 .../exec/physical/impl/join/JoinStatus.java     |  80 +++--
 .../exec/physical/impl/join/JoinTemplate.java   | 110 ++++--
 .../exec/physical/impl/join/JoinWorker.java     |  14 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 341 +++++++++++++++++--
 .../impl/join/MergeJoinBatchBuilder.java        |   6 +-
 .../physical/impl/join/MergeJoinCreator.java    |  38 +++
 .../exec/physical/impl/join/TestMergeJoin.java  | 220 ++++++++++++
 .../src/test/resources/join/merge_join.json     |  52 +++
 .../test/resources/join/merge_multi_batch.json  |  47 +++
 .../resources/join/merge_multi_batch.left.json  |  13 +
 .../resources/join/merge_multi_batch.right.json |  11 +
 .../test/resources/join/merge_single_batch.json |  37 ++
 .../resources/join/merge_single_batch.left.json |  13 +
 .../join/merge_single_batch.right.json          |  11 +
 24 files changed, 996 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
index abe2afe..154aca4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
@@ -44,7 +44,9 @@ public class JaninoClassCompiler implements ClassCompiler{
   }
 
   public byte[] getClassByteCode(final String className, final String code) throws CompileException, IOException, ClassNotFoundException, ClassTransformationException {
-    logger.debug("Compiling:\n {}", code);
+    if(logger.isDebugEnabled()){
+      logger.debug("Compiling:\n {}", prefixLineNumbers(code));
+    }
     StringReader reader = new StringReader(code);
     Scanner scanner = new Scanner((String) null, reader);
     Java.CompilationUnit compilationUnit = new Parser(scanner).parseCompilationUnit();
@@ -55,6 +57,25 @@ public class JaninoClassCompiler implements ClassCompiler{
     return classFiles[0].toByteArray();
   }
 
+
+  private String prefixLineNumbers(String code) {
+    if (!debugLines) return code;
+    StringBuilder out = new StringBuilder();
+    int i = 1;
+    for (String line : code.split("\n")) {
+      int start = out.length();
+      out.append(i++);
+      int numLength = out.length() - start;
+      out.append(":");
+      for (int spaces = 0; spaces < 7 - numLength; ++spaces){
+        out.append(" ");
+      }
+      out.append(line);
+      out.append('\n');
+    }
+    return out.toString();
+  }
+
   public void setDebuggingInformation(boolean debugSource, boolean debugLines, boolean debugVars) {
     this.debugSource = debugSource;
     this.debugLines = debugLines;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
index 8646b9b..09639df 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
@@ -12,6 +12,7 @@ public class GeneratorMapping {
   private String reset;
   private String cleanup;
   
+
   public GeneratorMapping(String setup, String eval, String reset, String cleanup) {
     super();
     this.setup = setup;
@@ -20,6 +21,10 @@ public class GeneratorMapping {
     this.cleanup = cleanup;
   }
 
+  public static GeneratorMapping GM(String setup, String eval){
+    return create(setup, eval, null, null);
+  }
+  
   public static GeneratorMapping GM(String setup, String eval, String reset, String cleanup){
     return create(setup, eval, reset, cleanup);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index ad41452..c997db4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.config.UnionExchange;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
@@ -38,7 +40,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
-  public T visitUnion(UnionExchange union, X value) throws E {
+  public T visitUnion(Union union, X value) throws E {
     return visitOp(union, value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index acafd6c..870792a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -25,6 +25,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * A GroupScan operator represents all data which will be scanned by a given physical
+ * plan.  It is the superset of all SubScans for the plan.
+ */
 public interface GroupScan extends Scan, HasAffinity{
 
   public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5f50422..97e6795 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.config.UnionExchange;
 
 /**
@@ -45,7 +47,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
index f75ba19..9d00c82 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
@@ -19,5 +19,9 @@ package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.exec.physical.ReadEntry;
 
+/**
+ * A SubScan operator represents the data scanned by a particular major/minor fragment.  This is in contrast to
+ * a GroupScan operator, which represents all data scanned by a physical plan.
+ */
 public interface SubScan extends Scan {
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 05fee19..19351cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -61,4 +61,8 @@ public class MergeJoinPOP extends AbstractBase{
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.forArray(left, right);
   }
+
+  public List<JoinCondition> getConditions() {
+    return conditions;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index fb4b371..9984454 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.physical.config.Screen;
@@ -37,7 +38,9 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
+import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -73,6 +76,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SVRemoverCreator svc = new SVRemoverCreator();
   private SortBatchCreator sbc = new SortBatchCreator();
   private AggBatchCreator abc = new AggBatchCreator();
+  private MergeJoinCreator mjc = new MergeJoinCreator();
   private RootExec root = null;
   
   private ImplCreator(){}
@@ -118,6 +122,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   }
 
   @Override
+  public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
+    return mjc.getBatch(context, op, getChildren(op, context));
+  }
+
+  @Override
   public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkArgument(root == null);
     root = sc.getRoot(context, op, getChildren(op, context));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
index 42ca604..beb3e28 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
@@ -1,9 +1,10 @@
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
 
 public interface JoinEvaluator {
-  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
-  public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
-  public abstract int compare(int leftPosition, int rightPosition);
+  public abstract void doSetup(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
new file mode 100644
index 0000000..1081244
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
+
+
+public interface JoinInnerSignature extends CodeGeneratorSignature {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8831006..c755e5f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -12,23 +12,24 @@ public final class JoinStatus {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
 
   public static enum RightSourceMode {
-    INCOMING_BATCHES, QUEUED_BATCHES;
+    INCOMING, SV4;
   }
 
-  public int leftPosition;
-  private final RecordBatch left;
+  public final RecordBatch left;
+  private int leftPosition;
   private IterOutcome lastLeft;
 
-  public int rightPosition;
-  public int svRightPosition;
-  private final RecordBatch right;
+  public final RecordBatch right;
+  private int rightPosition;
+  private int svRightPosition;
   private IterOutcome lastRight;
   
-  public int outputPosition;
-  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+  private int outputPosition;
+  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
   public MergeJoinBatch outputBatch;
   public SelectionVector4 sv4;
 
+  public boolean ok = true;
   private boolean initialSet = false;
   private boolean leftRepeating = false;
   
@@ -52,11 +53,10 @@ public final class JoinStatus {
   }
 
   public final void advanceRight(){
-    if (rightSourceMode == RightSourceMode.INCOMING_BATCHES)
+    if (rightSourceMode == RightSourceMode.INCOMING)
       rightPosition++;
-    else {
-      // advance through queued batches
-    }
+    else
+      svRightPosition++;
   }
 
   public final int getLeftPosition() {
@@ -64,7 +64,24 @@ public final class JoinStatus {
   }
 
   public final int getRightPosition() {
-    return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? rightPosition : svRightPosition;
+    return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
+  }
+
+  public final void setRightPosition(int pos) {
+    rightPosition = pos;
+  }
+
+
+  public final int getOutPosition() {
+    return outputPosition;
+  }
+
+  public final int fetchAndIncOutputPos() {
+    return outputPosition++;
+  }
+
+  public final void resetOutputPos() {
+    outputPosition = 0;
   }
 
   public final void notifyLeftRepeating() {
@@ -74,6 +91,7 @@ public final class JoinStatus {
 
   public final void notifyLeftStoppedRepeating() {
     leftRepeating = false;
+    svRightPosition = 0;
   }
 
   public final boolean isLeftRepeating() {
@@ -81,12 +99,11 @@ public final class JoinStatus {
   }
 
   public void setDefaultAdvanceMode() {
-    rightSourceMode = RightSourceMode.INCOMING_BATCHES;
-    rightPosition = 0;
+    rightSourceMode = RightSourceMode.INCOMING;
   }
 
-  public void setRepeatedAdvanceMode() {
-    rightSourceMode = RightSourceMode.QUEUED_BATCHES;
+  public void setSV4AdvanceMode() {
+    rightSourceMode = RightSourceMode.SV4;
     svRightPosition = 0;
   }
 
@@ -95,7 +112,7 @@ public final class JoinStatus {
    * Side effect: advances to next left batch if current left batch size is exceeded.
    */
   public final boolean isLeftPositionAllowed(){
-    if(!isNextLeftPositionInCurrentBatch()){
+    if(!isLeftPositionInCurrentBatch()){
       leftPosition = 0;
       lastLeft = left.next();
       return lastLeft == IterOutcome.OK;
@@ -110,7 +127,10 @@ public final class JoinStatus {
    * Side effect: advances to next right batch if current right batch size is exceeded
    */
   public final boolean isRightPositionAllowed(){
-    if(isNextRightPositionInCurrentBatch()){
+    if (rightSourceMode == RightSourceMode.SV4)
+      return svRightPosition < sv4.getCount();
+
+    if(!isRightPositionInCurrentBatch()){
       rightPosition = 0;
       lastRight = right.next();
       return lastRight == IterOutcome.OK;
@@ -124,18 +144,34 @@ public final class JoinStatus {
   /**
    * Check if the left record position can advance by one in the current batch.
    */
-  public final boolean isNextLeftPositionInCurrentBatch() {
+  public final boolean isLeftPositionInCurrentBatch() {
     return leftPosition < left.getRecordCount();
   }
 
   /**
-   * Check if the left record position can advance by one in the current batch.
+   * Check if the right record position can advance by one in the current batch.
    */
-  public final boolean isNextRightPositionInCurrentBatch() {
+  public final boolean isRightPositionInCurrentBatch() {
     return rightPosition < right.getRecordCount();
   }
 
+  /**
+   * Check if the next left record position can advance by one in the current batch.
+   */
+  public final boolean isNextLeftPositionInCurrentBatch() {
+    return leftPosition + 1 < left.getRecordCount();
+  }
+
+  /**
+   * Check if the next left record position can advance by one in the current batch.
+   */
+  public final boolean isNextRightPositionInCurrentBatch() {
+    return rightPosition + 1 < right.getRecordCount();
+  }
+
   public JoinOutcome getOutcome(){
+    if (!ok)
+      return JoinOutcome.FAILURE;
     if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
       return JoinOutcome.BATCH_RETURNED;
     if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 51cc5e5..5feb5ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -1,6 +1,9 @@
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.exec.record.RecordBatch;
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorContainer;
 
 /**
@@ -52,11 +55,10 @@ import org.apache.drill.exec.record.VectorContainer;
  *   - this is required since code may be regenerated before completion of an outgoing record batch.
  */
 public abstract class JoinTemplate implements JoinWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinTemplate.class);
 
   @Override
-  public void setupJoin(JoinStatus status, VectorContainer outgoing){
-    
+  public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
+    doSetup(context, status, outgoing);
   }
 
   /**
@@ -67,53 +69,84 @@ public abstract class JoinTemplate implements JoinWorker {
     while (true) {
       // for each record
 
-      // validate position and advance to the next record batch if necessary
-      if (!status.isLeftPositionAllowed()) return;
-      if (!status.isRightPositionAllowed()) return;
+      // validate input iterators (advancing to the next record batch if necessary)
+      if (!status.isRightPositionAllowed()) {
+        // we've hit the end of the right record batch; copy any remaining values from the left batch
+        while (status.isLeftPositionAllowed()) {
+          doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+          status.advanceLeft();
+        }
+        return;
+      }
+      if (!status.isLeftPositionAllowed())
+        return;
 
-      int comparison = compare(status.leftPosition, status.rightPosition);
+      int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
       switch (comparison) {
 
       case -1:
         // left key < right key
+        doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
         status.advanceLeft();
         continue;
 
       case 0:
         // left key == right key
+
+        // check for repeating values on the left side
         if (!status.isLeftRepeating() &&
             status.isNextLeftPositionInCurrentBatch() &&
-            compareNextLeftKey(status.leftPosition) == 0) {
-          // records in the left batch contain duplicate keys
-          // TODO: leftHasDups = true, if next left key matches but is in a new batch
+            doCompareNextLeftKey(status.getLeftPosition()) == 0)
+          // subsequent record(s) in the left batch have the same key
           status.notifyLeftRepeating();
-        }
+
+        else if (status.isLeftRepeating() &&
+                 status.isNextLeftPositionInCurrentBatch() &&
+                 doCompareNextLeftKey(status.getLeftPosition()) != 0)
+          // this record marks the end of repeated keys
+          status.notifyLeftStoppedRepeating();
         
+        boolean crossedBatchBoundaries = false;
+        int initialRightPosition = status.getRightPosition();
         do {
           // copy all equal right keys to the output record batch
-          if (!copy(status.leftPosition, status.rightPosition, status.outputPosition++))
+          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return;
 
-          // If the left key has duplicates and we're about to cross batch boundaries, queue the
-          // right table's record batch before calling next.  These records will need to be copied
-          // again for each duplicate left key.
+          if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
+            return;
+          
+          // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
+          // right table's record batch to the sv4 builder before calling next.  These records will need to be
+          // copied again for each duplicate left key.
           if (status.isLeftRepeating() && !status.isNextRightPositionInCurrentBatch()) {
-            // last record in right batch is a duplicate, and at the end of the batch
             status.outputBatch.addRightToBatchBuilder();
+            crossedBatchBoundaries = true;
           }
           status.advanceRight();
-        } while (status.isRightPositionAllowed() && compare(status.leftPosition, status.rightPosition) == 0);
 
+        } while (status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
+
+        if (status.getRightPosition() > initialRightPosition && status.isLeftRepeating())
+          // more than one matching result from right table; reset position in case of subsequent left match
+          status.setRightPosition(initialRightPosition);
         status.advanceLeft();
 
-        if (status.isLeftRepeating() && compareNextLeftKey(status.leftPosition) != 0) {
+        if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) {
           // left no longer has duplicates.  switch back to incoming batch mode
           status.setDefaultAdvanceMode();
           status.notifyLeftStoppedRepeating();
-        } else if (status.isLeftRepeating()) {
-          // left is going to repeat; use sv4 for right batch
-          status.setRepeatedAdvanceMode();
-        }          
+        } else if (status.isLeftRepeating() && crossedBatchBoundaries) {
+          try {
+            // build the right batches and 
+            status.outputBatch.batchBuilder.build();
+            status.setSV4AdvanceMode();
+          } catch (SchemaChangeException e) {
+            status.ok = false;
+          }
+          // return to indicate recompile in right-sv4 mode
+          return;
+        }
 
         continue;
 
@@ -128,17 +161,24 @@ public abstract class JoinTemplate implements JoinWorker {
     }
   }
 
-  
+  // Generated Methods
+
+  public abstract void doSetup(@Named("context") FragmentContext context,
+      @Named("status") JoinStatus status,
+      @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;
+
+
   /**
    * Copy the data to the new record batch (if it fits).
    *
    * @param leftPosition  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
-   * @param rightPosition position of batch (lower 16 bits) and record (upper 16 bits) in right SV4
    * @param outputPosition position of the output record batch
    * @return Whether or not the data was copied.
    */
-  protected abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
-  
+  public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+  public abstract boolean doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
+
+
   /**
    * Compare the values of the left and right join key to determine whether the left is less than, greater than
    * or equal to the right.
@@ -149,7 +189,17 @@ public abstract class JoinTemplate implements JoinWorker {
    *         -1 if left is < right
    *          1 if left is > right
    */
-  protected abstract int compare(int leftPosition, int rightPosition);
-  protected abstract int compareNextLeftKey(int position);
-  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
+  protected abstract int doCompare(@Named("leftIndex") int leftIndex,
+      @Named("rightIndex") int rightIndex);
+
+
+  /**
+   * Compare the current left key to the next left key, if it's within the batch.
+   * @return  0 if both keys are equal
+   *          1 if the keys are not equal
+   *         -1 if there are no more keys in this batch
+   */
+  protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 54d2076..6708279 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -1,20 +1,20 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorContainer;
 
 
 public interface JoinWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinWorker.class);
   
   public static enum JoinOutcome {
-    NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, FAILURE;
+    NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE;
   }
-  
-  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<JoinWorker>( //
-      JoinWorker.class, "org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", JoinEvaluator.class, null);
 
-  
-  public void setupJoin(JoinStatus status, VectorContainer outgoing);
+  public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
   public void doJoin(JoinStatus status);
+  
+  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 4d633bb..a2b84da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -1,20 +1,26 @@
 package org.apache.drill.exec.physical.impl.join;
 
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
 import java.io.IOException;
-import java.util.List;
 
-import com.google.common.collect.ArrayListMultimap;
+import com.sun.codemodel.*;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
 
 /**
  * A merge join combining to incoming in-order batches.
@@ -23,10 +29,55 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
   
+//  private static GeneratorMapping setup = GM("doSetup", "doSetup");
+//  private static GeneratorMapping copyLeft = GM("doSetup", "doCopyLeft");
+//  private static GeneratorMapping copyRight = GM("doSetup", "doCopyRight");
+//  private static GeneratorMapping compare = GM("doSetup", "doCompare");
+//  private static GeneratorMapping compareLeft= GM("doSetup", "doCompareNextLeftKey");
+//  
+//  private static final MappingSet SETUP_MAPPING = new MappingSet((String) null, null, setup, setup);
+//  private static final MappingSet COPY_LEFT_MAPPING = new MappingSet("leftIndex", "outIndex", copyLeft, copyLeft);
+//  private static final MappingSet COPY_RIGHT_MAPPING = new MappingSet("rightIndex", "outIndex", copyRight, copyRight);
+//  private static final MappingSet COMPARE_MAPPING = new MappingSet("leftIndex", "rightIndex", compare, compare);
+//  private static final MappingSet COMPARE_RIGHT_MAPPING = new MappingSet("rightIndex", null, compare, compare);
+//  private static final MappingSet COMPARE_LEFT_MAPPING = new MappingSet("leftIndex", "null", compareLeft, compareLeft);
+//  private static final MappingSet COMPARE_NEXT_LEFT_MAPPING = new MappingSet("nextLeftIndex", "null", compareLeft, compareLeft);
+//  
+  public static final MappingSet SETUP_MAPPING =
+      new MappingSet("null", "null", 
+                     GM("doSetup", "doSetup", null, null),
+                     GM("doSetup", "doSetup", null, null));
+  public static final MappingSet COPY_LEFT_MAPPING =
+      new MappingSet("leftIndex", "outIndex",
+                     GM("doSetup", "doCopyLeft", null, null),
+                     GM("doSetup", "doCopyLeft", null, null));
+  public static final MappingSet COPY_RIGHT_MAPPING =
+      new MappingSet("rightIndex", "outIndex",
+                     GM("doSetup", "doCopyRight", null, null),
+                     GM("doSetup", "doCopyRight", null, null));
+  public static final MappingSet COMPARE_MAPPING =
+      new MappingSet("leftIndex", "rightIndex",
+                     GM("doSetup", "doCompare", null, null),
+                     GM("doSetup", "doCompare", null, null));
+  public static final MappingSet COMPARE_RIGHT_MAPPING =
+      new MappingSet("rightIndex", "null",
+                     GM("doSetup", "doCompare", null, null),
+                     GM("doSetup", "doCompare", null, null));
+  public static final MappingSet COMPARE_LEFT_MAPPING =
+      new MappingSet("leftIndex", "null",
+                     GM("doSetup", "doCompareNextLeftKey", null, null),
+                     GM("doSetup", "doCompareNextLeftKey", null, null));
+  public static final MappingSet COMPARE_NEXT_LEFT_MAPPING =
+      new MappingSet("nextLeftIndex", "null",
+                     GM("doSetup", "doCompareNextLeftKey", null, null),
+                     GM("doSetup", "doCompareNextLeftKey", null, null));
+
+  
   private final RecordBatch left;
   private final RecordBatch right;
   private final JoinStatus status;
   private JoinWorker worker;
+  private JoinCondition condition;
   public MergeJoinBatchBuilder batchBuilder;
   
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
@@ -35,11 +86,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     this.right = right;
     this.status = new JoinStatus(left, right, this);
     this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+    this.condition = popConfig.getConditions().get(0);
+    // currently only one join condition is supported
+    assert popConfig.getConditions().size() == 1;
   }
 
   @Override
   public int getRecordCount() {
-    return status.outputPosition;
+    return status.getOutPosition();
   }
 
   @Override
@@ -50,24 +104,31 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     
     // loop so we can start over again if we find a new batch was created.
     while(true){
-      
+
+      // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
+      if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+          status.getOutcome() == JoinOutcome.SCHEMA_CHANGED)
+        allocateBatch();
+
+      // reset the output position to zero after our parent iterates this RecordBatch
+      if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+          status.getOutcome() == JoinOutcome.SCHEMA_CHANGED ||
+          status.getOutcome() == JoinOutcome.NO_MORE_DATA)
+        status.resetOutputPos();
+
       boolean first = false;
       if(worker == null){
         try {
-          this.worker = getNewWorker();
+          logger.debug("Creating New Worker");
+          this.worker = generateNewWorker();
           first = true;
-        } catch (ClassTransformationException | IOException e) {
+        } catch (ClassTransformationException | IOException | SchemaChangeException e) {
           context.fail(new SchemaChangeException(e));
           kill();
           return IterOutcome.STOP;
         }
       }
 
-      // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
-      if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){
-        allocateBatch();
-      }
-
       // join until we have a complete outgoing batch
       worker.doJoin(status);
 
@@ -75,17 +136,19 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       switch(status.getOutcome()){
       case BATCH_RETURNED:
         // only return new schema if new worker has been setup.
+        logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
         return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
       case FAILURE:
         kill();
         return IterOutcome.STOP;
       case NO_MORE_DATA:
-        return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE;
-      case MODE_CHANGED:
+        logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? "OK" : "NONE"));
+        return status.getOutPosition() > 0 ? IterOutcome.OK: IterOutcome.NONE;
       case SCHEMA_CHANGED:
         worker = null;
-        if(status.outputPosition > 0){
+        if(status.getOutPosition() > 0){
           // if we have current data, let's return that.
+          logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
           return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
         }else{
           // loop again to rebuild worker.
@@ -113,23 +176,243 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     right.kill();
   }
 
-  private JoinWorker getNewWorker() throws ClassTransformationException, IOException{
-    CodeGenerator<JoinWorker> cg = new CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+  private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
 
-    // if (status.rightSourceMode)
-      // generate copier which deref's SV4
-    // else
-      // generate direct copier.
-    
-    // generate comparator.
-    // generate compareNextLeftKey.
+    final CodeGenerator<JoinWorker> cg = new CodeGenerator<>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final LogicalExpression leftFieldExpr = condition.getLeft();
+    final LogicalExpression rightFieldExpr = condition.getRight();
+
+    // Generate members and initialization code
+    /////////////////////////////////////////
+
+    // declare and assign JoinStatus member
+    cg.setMappingSet(SETUP_MAPPING);
+    JClass joinStatusClass = cg.getModel().ref(JoinStatus.class);
+    JVar joinStatus = cg.clazz.field(JMod.NONE, joinStatusClass, "status");
+    cg.getSetupBlock().assign(JExpr._this().ref(joinStatus), JExpr.direct("status"));
+
+    // declare and assign outgoing VectorContainer member
+    JClass vectorContainerClass = cg.getModel().ref(VectorContainer.class);
+    JVar outgoingVectorContainer = cg.clazz.field(JMod.NONE, vectorContainerClass, "outgoing");
+    cg.getSetupBlock().assign(JExpr._this().ref(outgoingVectorContainer), JExpr.direct("outgoing"));
+
+    // declare and assign incoming left RecordBatch member
+    JClass recordBatchClass = cg.getModel().ref(RecordBatch.class);
+    JVar incomingLeftRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingLeft");
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingLeftRecordBatch), joinStatus.ref("left"));
+
+    // declare and assign incoming right RecordBatch member
+    JVar incomingRightRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingRight");
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRightRecordBatch), joinStatus.ref("right"));
+
+    // declare 'incoming' member so VVReadExpr generated code can point to the left or right batch
+    JVar incomingRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incoming");
+
+    // materialize value vector readers from join expression
+    final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector);
+    if (collector.hasErrors())
+      throw new ClassTransformationException(String.format(
+          "Failure while trying to materialize incoming left field.  Errors:\n %s.", collector.toErrorString()));
+
+    final LogicalExpression materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector);
+    if (collector.hasErrors())
+      throw new ClassTransformationException(String.format(
+          "Failure while trying to materialize incoming right field.  Errors:\n %s.", collector.toErrorString()));
+
+
+    // generate compare()
+    ////////////////////////
+    cg.setMappingSet(COMPARE_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+    CodeGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+    cg.setMappingSet(COMPARE_RIGHT_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch));
+    CodeGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false);
+
+    if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) {
+      // handle null == null
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+          .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+          ._then()
+          ._return(JExpr.lit(0));
+  
+      // handle null == !null
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+          .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+          ._then()
+          ._return(JExpr.lit(1));
+
+    } else if (compareLeftExprHolder.isOptional()) {
+      // handle null == required (null is less than any value)
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
+          ._then()
+          ._return(JExpr.lit(-1));
+
+    } else if (compareRightExprHolder.isOptional()) {
+      // handle required == null (null is less than any value)
+      cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
+          ._then()
+          ._return(JExpr.lit(1));
+    }
+
+    // equality
+    cg.getEvalBlock()._if(compareLeftExprHolder.getValue().eq(compareRightExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(0));
+    // less than
+    cg.getEvalBlock()._if(compareLeftExprHolder.getValue().lt(compareRightExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(-1));
+    // greater than
+    cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+    // generate compareNextLeftKey()
+    ////////////////////////////////
+    cg.setMappingSet(COMPARE_LEFT_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+
+    // int nextLeftIndex = leftIndex + 1;
+    cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
+
+    // check if the next key is in this batch
+    cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
+                     ._then()
+                       ._return(JExpr.lit(-1));
+
+    // generate VV read expressions
+    CodeGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+    cg.setMappingSet(COMPARE_NEXT_LEFT_MAPPING); // change mapping from 'leftIndex' to 'nextLeftIndex'
+    CodeGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+
+    if (compareThisLeftExprHolder.isOptional()) {
+      // handle null == null
+      cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                            .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                       ._then()
+                         ._return(JExpr.lit(0));
+  
+      // handle null == !null
+      cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                            .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                       ._then()
+                         ._return(JExpr.lit(1));
+    }
+
+    // check value equality
+    cg.getEvalBlock()._if(compareThisLeftExprHolder.getValue().eq(compareNextLeftExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(0));
+
+    // no match if reached
+    cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+    // generate copyLeft()
+    //////////////////////
+    cg.setMappingSet(COPY_LEFT_MAPPING);
+    int vectorId = 0;
+    for (VectorWrapper<?> vw : left) {
+      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
+                                                      new TypedFieldId(vw.getField().getType(), vectorId));
+      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+      // todo: check for room in vvOut
+      cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+                                   .arg(COPY_LEFT_MAPPING.getValueReadIndex())
+                                   .arg(COPY_LEFT_MAPPING.getValueWriteIndex())
+                                   .arg(vvIn));
+      ++vectorId;
+    }
+    cg.getEvalBlock()._return(JExpr.lit(true));
+
+    // generate copyRight()
+    ///////////////////////
+    cg.setMappingSet(COPY_RIGHT_MAPPING);
+
+    int rightVectorBase = vectorId;
+    for (VectorWrapper<?> vw : right) {
+      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
+                                                      new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
+      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+      cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+          .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+          .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+          .arg(vvIn));
+      ++vectorId;
+    }
+    cg.getEvalBlock()._return(JExpr.lit(true));
 
     JoinWorker w = context.getImplementationClass(cg);
-    w.setupJoin(status, this.container);
+    w.setupJoin(context, status, this.container);
     return w;
   }
 
-  private void allocateBatch(){
+  private void allocateBatch() {
     // allocate new batch space.
+    container.clear();
+
+    // add fields from both batches
+    for (VectorWrapper<?> w : left) {
+      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+      getAllocator(w.getValueVector(), outgoingVector).alloc(left.getRecordCount() * 4);
+      container.add(outgoingVector);
+    }
+
+    for (VectorWrapper<?> w : right) {
+      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+      getAllocator(w.getValueVector(), outgoingVector).alloc(right.getRecordCount() * 4);
+      container.add(outgoingVector);
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    logger.debug("Built joined schema: {}", container.getSchema());
+  }
+
+  private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
+    if(outgoing instanceof FixedWidthVector){
+      return new FixedVectorAllocator((FixedWidthVector) outgoing);
+    }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
+      return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
+    }else{
+      throw new UnsupportedOperationException();
+    }
   }
+
+  private class FixedVectorAllocator implements VectorAllocator{
+    FixedWidthVector out;
+
+    public FixedVectorAllocator(FixedWidthVector out) {
+      super();
+      this.out = out;
+    }
+
+    public void alloc(int recordCount){
+      out.allocateNew(recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  private class VariableVectorAllocator implements VectorAllocator{
+    VariableWidthVector in;
+    VariableWidthVector out;
+
+    public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
+      super();
+      this.in = in;
+      this.out = out;
+    }
+
+    public void alloc(int recordCount){
+      out.allocateNew(in.getByteCapacity(), recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  public interface VectorAllocator{
+    public void alloc(int recordCount);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index d75cfb9..85ca43d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -40,19 +40,20 @@ public class MergeJoinBatchBuilder {
   private JoinStatus status;
 
   public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+    this.container = new VectorContainer();
     this.status = status;
     this.svAllocator = context.getAllocator().getPreAllocator();
   }
 
   public boolean add(RecordBatch batch) {
     if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
-      throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+      throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
     if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
 
     // resource checks
     long batchBytes = getSize(batch);
     if (batchBytes + runningBytes > Integer.MAX_VALUE) return false;      // TODO: 2GB is arbitrary
-    if (runningBatches + 1 > Character.MAX_VALUE) return false;           // allowed in batch.
+    if (runningBatches++ >= Character.MAX_VALUE) return false;            // allowed in batch.
     if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
 
     // transfer VVs to a new RecordBatchData
@@ -73,7 +74,6 @@ public class MergeJoinBatchBuilder {
 
   public void build() throws SchemaChangeException {
     container.clear();
-//    if (queuedRightBatches.keySet().size() > 1) throw new SchemaChangeException("Join currently only supports a single schema.");
     if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
     status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
     BatchSchema schema = queuedRightBatches.keySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
new file mode 100644
index 0000000..1b65227
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 2);
+    return new MergeJoinBatch(config, context, children.get(0), children.get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
new file mode 100644
index 0000000..38b8225
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -0,0 +1,220 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+
+public class TestMergeJoin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
+
+  DrillConfig c = DrillConfig.create();
+
+  @Test
+  public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                 @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      for (ValueVector v : exec)
+        System.out.print("[" + v.getField().getName() + "]        ");
+      System.out.println("\n");
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = new ArrayList();
+        for (ValueVector v : exec) {
+           row.add(v.getAccessor().getObject(valueIdx));
+        }
+        for (Object cell : row) {
+          if (cell == null) { 
+            System.out.print("<null>          ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell);
+          for (int i = 0; i < (14 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+      System.out.println();
+    }
+    assertEquals(100, totalRecordCount);
+    System.out.println("Total Record Count: " + totalRecordCount);
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+  @Test
+  public void orderedEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                  @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(
+        Files.toString(
+            FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)
+            .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
+            .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+      System.out.println("       t1                 t2");
+                          
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = Lists.newArrayList();
+        for (ValueVector v : exec)
+          row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+        for (Object cell : row) {
+          if (cell == null) {
+            System.out.print("<null>    ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell + " ");
+          for (int i = 0; i < (10 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+    }
+    System.out.println("Total Record Count: " + totalRecordCount);
+    assertEquals(25, totalRecordCount);
+
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+
+  @Test
+  public void orderedEqualityMultiBatchJoin(@Injectable final DrillbitContext bitContext,
+                                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getConfig(); result = c;
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
+    PhysicalPlan plan = reader.readPhysicalPlan(
+        Files.toString(
+            FileUtils.getResourceAsFile("/join/merge_multi_batch.json"), Charsets.UTF_8)
+            .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
+            .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = Lists.newArrayList();
+        for (ValueVector v : exec)
+          row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+        for (Object cell : row) {
+          if (cell == null) {
+            System.out.print("<null>    ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell + " ");
+          for (int i = 0; i < (10 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+    }
+    System.out.println("Total Record Count: " + totalRecordCount);
+    assertEquals(25, totalRecordCount);
+
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception{
+    // pause to get logger to catch up.
+    Thread.sleep(1000);
+  }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
new file mode 100644
index 0000000..e6a92bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
@@ -0,0 +1,52 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org",
+      entries:[
+        {records: 100, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id:2,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org",
+      entries:[
+        {records: 50, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]},
+        {records: 50, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id: 3,
+      right: 1,
+      left: 2,
+      pop: "merge-join",
+      join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
new file mode 100644
index 0000000..ebdfd38
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
@@ -0,0 +1,47 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"json-sub-scan",
+      readEntries:[
+        {path: "#{LEFT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }
+      
+    },
+    {
+      @id:2,
+      pop:"json-sub-scan",
+      readEntries:[
+        {path: "#{RIGHT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }
+      
+    },
+    {
+      @id: 3,
+      left: 1,
+      right: 2,
+      pop: "merge-join",
+      join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
new file mode 100644
index 0000000..8cf4640
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100, "leftbatch0": 0}
+{"a":1, "b":200, "leftbatch0": 0}
+{"a":1, "b":300, "leftbatch0": 0}
+{"a":2, "b":400, "leftbatch0": 0}
+{"a":2, "b":500, "leftbatch0": 0}
+{"a":2, "b":600, "leftbatch1": 1}
+{"a":3, "b":700, "leftbatch1": 1}
+{"a":4, "b":800, "leftbatch1": 1}
+{"a":5, "b":900, "leftbatch1": 1}
+{"a":6, "b":1000, "leftbatch1": 1}
+{"a":7, "b":1100, "leftbatch1": 1}
+{"a":8, "b":1200, "leftbatch1": 1}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
new file mode 100644
index 0000000..30aa62d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1, "rightbatch0": 0}
+{"bb":20000, "aa":2, "rightbatch0": 0}
+{"bb":30000, "aa":2, "rightbatch0": 0}
+{"bb":40000, "aa":2, "rightbatch0": 0}
+{"bb":50000, "aa":2, "rightbatch1": 1}
+{"bb":60000, "aa":2, "rightbatch1": 1}
+{"bb":70000, "aa":3, "rightbatch1": 1}
+{"bb":80000, "aa":4, "rightbatch2": 2}
+{"bb":90000, "aa":6, "rightbatch3": 3}
+{"bb":100000, "aa":7, "rightbatch4": 4}
+{"bb":110000, "aa":7, "rightbatch5": 5}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
new file mode 100644
index 0000000..0e4f79d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
@@ -0,0 +1,37 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"json-sub-scan",
+      entries:[
+        {url: "#{LEFT_FILE}"}
+      ]
+    },
+    {
+      @id:2,
+      pop:"json-sub-scan",
+      entries:[
+        {url: "#{RIGHT_FILE}"}
+      ]
+    },
+    {
+      @id: 3,
+      left: 1,
+      right: 2,
+      pop: "merge-join",
+      join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
new file mode 100644
index 0000000..e7bab08
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100}
+{"a":1, "b":200}
+{"a":1, "b":300}
+{"a":2, "b":400}
+{"a":2, "b":500}
+{"a":2, "b":600}
+{"a":3, "b":700}
+{"a":4, "b":800}
+{"a":5, "b":900}
+{"a":6, "b":1000}
+{"a":7, "b":1100}
+{"a":8, "b":1200}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
new file mode 100644
index 0000000..d99a145
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1}
+{"bb":20000, "aa":2}
+{"bb":30000, "aa":2}
+{"bb":40000, "aa":2}
+{"bb":50000, "aa":2}
+{"bb":60000, "aa":2}
+{"bb":70000, "aa":3}
+{"bb":80000, "aa":4}
+{"bb":90000, "aa":6}
+{"bb":100000, "aa":7}
+{"bb":110000, "aa":7}