You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/29 05:43:36 UTC
[3/7] git commit: DRILL-190 (part2) - MergeJoinBatch handles record
batches - JoinStatus tracks state across input and output batches -
MergeJoinBatchBuilder builds a selection vector of right-side batches which
may be rescanned - implement code stub
DRILL-190 (part2)
- MergeJoinBatch handles record batches
- JoinStatus tracks state across input and output batches
- MergeJoinBatchBuilder builds a selection vector of right-side batches which may be rescanned
- implement code stubs for merge join
- add field expression parsing and start of generated merge join code
- code generator support for merge-join's copyLeft(), copyRight(), compare() and compareNextLeftKey()
- add line prefixes to generated code log
- support VectorContainers in declareVectorValueSetupAndMember()
- fix vector allocation in MergeJoinBatch
- fix missing values from left batch when right batch has been exhausted
- fix nullable handling in generated merge join code. make simple merge join test use multiple batches.
- fixes for sv4 batch support, additional multi batch test
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0bac2f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0bac2f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0bac2f0
Branch: refs/heads/master
Commit: e0bac2f0064be181fd03c18e1bfb243492cd1792
Parents: 8ceee5d
Author: Ben Becker <be...@gmail.com>
Authored: Thu Aug 15 21:16:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:36:45 2013 -0700
----------------------------------------------------------------------
.../drill/exec/compile/JaninoClassCompiler.java | 23 +-
.../exec/compile/sig/GeneratorMapping.java | 5 +
.../physical/base/AbstractPhysicalVisitor.java | 4 +-
.../drill/exec/physical/base/GroupScan.java | 4 +
.../exec/physical/base/PhysicalVisitor.java | 4 +-
.../drill/exec/physical/base/SubScan.java | 4 +
.../exec/physical/config/MergeJoinPOP.java | 4 +
.../drill/exec/physical/impl/ImplCreator.java | 9 +
.../exec/physical/impl/join/JoinEvaluator.java | 9 +-
.../physical/impl/join/JoinInnerSignature.java | 35 ++
.../exec/physical/impl/join/JoinStatus.java | 80 +++--
.../exec/physical/impl/join/JoinTemplate.java | 110 ++++--
.../exec/physical/impl/join/JoinWorker.java | 14 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 341 +++++++++++++++++--
.../impl/join/MergeJoinBatchBuilder.java | 6 +-
.../physical/impl/join/MergeJoinCreator.java | 38 +++
.../exec/physical/impl/join/TestMergeJoin.java | 220 ++++++++++++
.../src/test/resources/join/merge_join.json | 52 +++
.../test/resources/join/merge_multi_batch.json | 47 +++
.../resources/join/merge_multi_batch.left.json | 13 +
.../resources/join/merge_multi_batch.right.json | 11 +
.../test/resources/join/merge_single_batch.json | 37 ++
.../resources/join/merge_single_batch.left.json | 13 +
.../join/merge_single_batch.right.json | 11 +
24 files changed, 996 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
index abe2afe..154aca4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
@@ -44,7 +44,9 @@ public class JaninoClassCompiler implements ClassCompiler{
}
public byte[] getClassByteCode(final String className, final String code) throws CompileException, IOException, ClassNotFoundException, ClassTransformationException {
- logger.debug("Compiling:\n {}", code);
+ if(logger.isDebugEnabled()){
+ logger.debug("Compiling:\n {}", prefixLineNumbers(code));
+ }
StringReader reader = new StringReader(code);
Scanner scanner = new Scanner((String) null, reader);
Java.CompilationUnit compilationUnit = new Parser(scanner).parseCompilationUnit();
@@ -55,6 +57,25 @@ public class JaninoClassCompiler implements ClassCompiler{
return classFiles[0].toByteArray();
}
+
+ private String prefixLineNumbers(String code) {
+ if (!debugLines) return code;
+ StringBuilder out = new StringBuilder();
+ int i = 1;
+ for (String line : code.split("\n")) {
+ int start = out.length();
+ out.append(i++);
+ int numLength = out.length() - start;
+ out.append(":");
+ for (int spaces = 0; spaces < 7 - numLength; ++spaces){
+ out.append(" ");
+ }
+ out.append(line);
+ out.append('\n');
+ }
+ return out.toString();
+ }
+
public void setDebuggingInformation(boolean debugSource, boolean debugLines, boolean debugVars) {
this.debugSource = debugSource;
this.debugLines = debugLines;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
index 8646b9b..09639df 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
@@ -12,6 +12,7 @@ public class GeneratorMapping {
private String reset;
private String cleanup;
+
public GeneratorMapping(String setup, String eval, String reset, String cleanup) {
super();
this.setup = setup;
@@ -20,6 +21,10 @@ public class GeneratorMapping {
this.cleanup = cleanup;
}
+ public static GeneratorMapping GM(String setup, String eval){
+ return create(setup, eval, null, null);
+ }
+
public static GeneratorMapping GM(String setup, String eval, String reset, String cleanup){
return create(setup, eval, reset, cleanup);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index ad41452..c997db4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;
public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
@@ -38,7 +40,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
- public T visitUnion(UnionExchange union, X value) throws E {
+ public T visitUnion(Union union, X value) throws E {
return visitOp(union, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index acafd6c..870792a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -25,6 +25,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.fasterxml.jackson.annotation.JsonProperty;
+/**
+ * A GroupScan operator represents all data which will be scanned by a given physical
+ * plan. It is the superset of all SubScans for the plan.
+ */
public interface GroupScan extends Scan, HasAffinity{
public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5f50422..97e6795 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;
/**
@@ -45,7 +47,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
- public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
+ public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
index f75ba19..9d00c82 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
@@ -19,5 +19,9 @@ package org.apache.drill.exec.physical.base;
import org.apache.drill.exec.physical.ReadEntry;
+/**
+ * A SubScan operator represents the data scanned by a particular major/minor fragment. This is in contrast to
+ * a GroupScan operator, which represents all data scanned by a physical plan.
+ */
public interface SubScan extends Scan {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 05fee19..19351cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -61,4 +61,8 @@ public class MergeJoinPOP extends AbstractBase{
public Iterator<PhysicalOperator> iterator() {
return Iterators.forArray(left, right);
}
+
+ public List<JoinCondition> getConditions() {
+ return conditions;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index fb4b371..9984454 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
@@ -37,7 +38,9 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
+import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
+import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -73,6 +76,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private SVRemoverCreator svc = new SVRemoverCreator();
private SortBatchCreator sbc = new SortBatchCreator();
private AggBatchCreator abc = new AggBatchCreator();
+ private MergeJoinCreator mjc = new MergeJoinCreator();
private RootExec root = null;
private ImplCreator(){}
@@ -118,6 +122,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
}
@Override
+ public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
+ return mjc.getBatch(context, op, getChildren(op, context));
+ }
+
+ @Override
public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
Preconditions.checkArgument(root == null);
root = sc.getRoot(context, op, getChildren(op, context));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
index 42ca604..beb3e28 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
@@ -1,9 +1,10 @@
package org.apache.drill.exec.physical.impl.join;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
public interface JoinEvaluator {
- public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
- public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
- public abstract int compare(int leftPosition, int rightPosition);
+ public abstract void doSetup(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
new file mode 100644
index 0000000..1081244
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
+
+
+public interface JoinInnerSignature extends CodeGeneratorSignature {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8831006..c755e5f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -12,23 +12,24 @@ public final class JoinStatus {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
public static enum RightSourceMode {
- INCOMING_BATCHES, QUEUED_BATCHES;
+ INCOMING, SV4;
}
- public int leftPosition;
- private final RecordBatch left;
+ public final RecordBatch left;
+ private int leftPosition;
private IterOutcome lastLeft;
- public int rightPosition;
- public int svRightPosition;
- private final RecordBatch right;
+ public final RecordBatch right;
+ private int rightPosition;
+ private int svRightPosition;
private IterOutcome lastRight;
- public int outputPosition;
- public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+ private int outputPosition;
+ public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
public MergeJoinBatch outputBatch;
public SelectionVector4 sv4;
+ public boolean ok = true;
private boolean initialSet = false;
private boolean leftRepeating = false;
@@ -52,11 +53,10 @@ public final class JoinStatus {
}
public final void advanceRight(){
- if (rightSourceMode == RightSourceMode.INCOMING_BATCHES)
+ if (rightSourceMode == RightSourceMode.INCOMING)
rightPosition++;
- else {
- // advance through queued batches
- }
+ else
+ svRightPosition++;
}
public final int getLeftPosition() {
@@ -64,7 +64,24 @@ public final class JoinStatus {
}
public final int getRightPosition() {
- return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? rightPosition : svRightPosition;
+ return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
+ }
+
+ public final void setRightPosition(int pos) {
+ rightPosition = pos;
+ }
+
+
+ public final int getOutPosition() {
+ return outputPosition;
+ }
+
+ public final int fetchAndIncOutputPos() {
+ return outputPosition++;
+ }
+
+ public final void resetOutputPos() {
+ outputPosition = 0;
}
public final void notifyLeftRepeating() {
@@ -74,6 +91,7 @@ public final class JoinStatus {
public final void notifyLeftStoppedRepeating() {
leftRepeating = false;
+ svRightPosition = 0;
}
public final boolean isLeftRepeating() {
@@ -81,12 +99,11 @@ public final class JoinStatus {
}
public void setDefaultAdvanceMode() {
- rightSourceMode = RightSourceMode.INCOMING_BATCHES;
- rightPosition = 0;
+ rightSourceMode = RightSourceMode.INCOMING;
}
- public void setRepeatedAdvanceMode() {
- rightSourceMode = RightSourceMode.QUEUED_BATCHES;
+ public void setSV4AdvanceMode() {
+ rightSourceMode = RightSourceMode.SV4;
svRightPosition = 0;
}
@@ -95,7 +112,7 @@ public final class JoinStatus {
* Side effect: advances to next left batch if current left batch size is exceeded.
*/
public final boolean isLeftPositionAllowed(){
- if(!isNextLeftPositionInCurrentBatch()){
+ if(!isLeftPositionInCurrentBatch()){
leftPosition = 0;
lastLeft = left.next();
return lastLeft == IterOutcome.OK;
@@ -110,7 +127,10 @@ public final class JoinStatus {
* Side effect: advances to next right batch if current right batch size is exceeded
*/
public final boolean isRightPositionAllowed(){
- if(isNextRightPositionInCurrentBatch()){
+ if (rightSourceMode == RightSourceMode.SV4)
+ return svRightPosition < sv4.getCount();
+
+ if(!isRightPositionInCurrentBatch()){
rightPosition = 0;
lastRight = right.next();
return lastRight == IterOutcome.OK;
@@ -124,18 +144,34 @@ public final class JoinStatus {
/**
* Check if the left record position can advance by one in the current batch.
*/
- public final boolean isNextLeftPositionInCurrentBatch() {
+ public final boolean isLeftPositionInCurrentBatch() {
return leftPosition < left.getRecordCount();
}
/**
- * Check if the left record position can advance by one in the current batch.
+ * Check if the right record position can advance by one in the current batch.
*/
- public final boolean isNextRightPositionInCurrentBatch() {
+ public final boolean isRightPositionInCurrentBatch() {
return rightPosition < right.getRecordCount();
}
+ /**
+ * Check if the next left record position can advance by one in the current batch.
+ */
+ public final boolean isNextLeftPositionInCurrentBatch() {
+ return leftPosition + 1 < left.getRecordCount();
+ }
+
+ /**
+ * Check if the next left record position can advance by one in the current batch.
+ */
+ public final boolean isNextRightPositionInCurrentBatch() {
+ return rightPosition + 1 < right.getRecordCount();
+ }
+
public JoinOutcome getOutcome(){
+ if (!ok)
+ return JoinOutcome.FAILURE;
if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
return JoinOutcome.BATCH_RETURNED;
if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 51cc5e5..5feb5ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -1,6 +1,9 @@
package org.apache.drill.exec.physical.impl.join;
-import org.apache.drill.exec.record.RecordBatch;
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;
/**
@@ -52,11 +55,10 @@ import org.apache.drill.exec.record.VectorContainer;
* - this is required since code may be regenerated before completion of an outgoing record batch.
*/
public abstract class JoinTemplate implements JoinWorker {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinTemplate.class);
@Override
- public void setupJoin(JoinStatus status, VectorContainer outgoing){
-
+ public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
+ doSetup(context, status, outgoing);
}
/**
@@ -67,53 +69,84 @@ public abstract class JoinTemplate implements JoinWorker {
while (true) {
// for each record
- // validate position and advance to the next record batch if necessary
- if (!status.isLeftPositionAllowed()) return;
- if (!status.isRightPositionAllowed()) return;
+ // validate input iterators (advancing to the next record batch if necessary)
+ if (!status.isRightPositionAllowed()) {
+ // we've hit the end of the right record batch; copy any remaining values from the left batch
+ while (status.isLeftPositionAllowed()) {
+ doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+ status.advanceLeft();
+ }
+ return;
+ }
+ if (!status.isLeftPositionAllowed())
+ return;
- int comparison = compare(status.leftPosition, status.rightPosition);
+ int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
switch (comparison) {
case -1:
// left key < right key
+ doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
status.advanceLeft();
continue;
case 0:
// left key == right key
+
+ // check for repeating values on the left side
if (!status.isLeftRepeating() &&
status.isNextLeftPositionInCurrentBatch() &&
- compareNextLeftKey(status.leftPosition) == 0) {
- // records in the left batch contain duplicate keys
- // TODO: leftHasDups = true, if next left key matches but is in a new batch
+ doCompareNextLeftKey(status.getLeftPosition()) == 0)
+ // subsequent record(s) in the left batch have the same key
status.notifyLeftRepeating();
- }
+
+ else if (status.isLeftRepeating() &&
+ status.isNextLeftPositionInCurrentBatch() &&
+ doCompareNextLeftKey(status.getLeftPosition()) != 0)
+ // this record marks the end of repeated keys
+ status.notifyLeftStoppedRepeating();
+ boolean crossedBatchBoundaries = false;
+ int initialRightPosition = status.getRightPosition();
do {
// copy all equal right keys to the output record batch
- if (!copy(status.leftPosition, status.rightPosition, status.outputPosition++))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return;
- // If the left key has duplicates and we're about to cross batch boundaries, queue the
- // right table's record batch before calling next. These records will need to be copied
- // again for each duplicate left key.
+ if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
+ return;
+
+ // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
+ // right table's record batch to the sv4 builder before calling next. These records will need to be
+ // copied again for each duplicate left key.
if (status.isLeftRepeating() && !status.isNextRightPositionInCurrentBatch()) {
- // last record in right batch is a duplicate, and at the end of the batch
status.outputBatch.addRightToBatchBuilder();
+ crossedBatchBoundaries = true;
}
status.advanceRight();
- } while (status.isRightPositionAllowed() && compare(status.leftPosition, status.rightPosition) == 0);
+ } while (status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
+
+ if (status.getRightPosition() > initialRightPosition && status.isLeftRepeating())
+ // more than one matching result from right table; reset position in case of subsequent left match
+ status.setRightPosition(initialRightPosition);
status.advanceLeft();
- if (status.isLeftRepeating() && compareNextLeftKey(status.leftPosition) != 0) {
+ if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) {
// left no longer has duplicates. switch back to incoming batch mode
status.setDefaultAdvanceMode();
status.notifyLeftStoppedRepeating();
- } else if (status.isLeftRepeating()) {
- // left is going to repeat; use sv4 for right batch
- status.setRepeatedAdvanceMode();
- }
+ } else if (status.isLeftRepeating() && crossedBatchBoundaries) {
+ try {
+ // build the right batches and
+ status.outputBatch.batchBuilder.build();
+ status.setSV4AdvanceMode();
+ } catch (SchemaChangeException e) {
+ status.ok = false;
+ }
+ // return to indicate recompile in right-sv4 mode
+ return;
+ }
continue;
@@ -128,17 +161,24 @@ public abstract class JoinTemplate implements JoinWorker {
}
}
-
+ // Generated Methods
+
+ public abstract void doSetup(@Named("context") FragmentContext context,
+ @Named("status") JoinStatus status,
+ @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;
+
+
/**
* Copy the data to the new record batch (if it fits).
*
* @param leftPosition position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
- * @param rightPosition position of batch (lower 16 bits) and record (upper 16 bits) in right SV4
* @param outputPosition position of the output record batch
* @return Whether or not the data was copied.
*/
- protected abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
-
+ public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
+
+
/**
* Compare the values of the left and right join key to determine whether the left is less than, greater than
* or equal to the right.
@@ -149,7 +189,17 @@ public abstract class JoinTemplate implements JoinWorker {
* -1 if left is < right
* 1 if left is > right
*/
- protected abstract int compare(int leftPosition, int rightPosition);
- protected abstract int compareNextLeftKey(int position);
- public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
+ protected abstract int doCompare(@Named("leftIndex") int leftIndex,
+ @Named("rightIndex") int rightIndex);
+
+
+ /**
+ * Compare the current left key to the next left key, if it's within the batch.
+ * @return 0 if both keys are equal
+ * 1 if the keys are not equal
+ * -1 if there are no more keys in this batch
+ */
+ protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 54d2076..6708279 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -1,20 +1,20 @@
package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;
public interface JoinWorker {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinWorker.class);
public static enum JoinOutcome {
- NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, FAILURE;
+ NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE;
}
-
- public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<JoinWorker>( //
- JoinWorker.class, "org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", JoinEvaluator.class, null);
-
- public void setupJoin(JoinStatus status, VectorContainer outgoing);
+ public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
public void doJoin(JoinStatus status);
+
+ public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 4d633bb..a2b84da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -1,20 +1,26 @@
package org.apache.drill.exec.physical.impl.join;
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
import java.io.IOException;
-import java.util.List;
-import com.google.common.collect.ArrayListMultimap;
+import com.sun.codemodel.*;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
/**
* A merge join combining to incoming in-order batches.
@@ -23,10 +29,55 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+// private static GeneratorMapping setup = GM("doSetup", "doSetup");
+// private static GeneratorMapping copyLeft = GM("doSetup", "doCopyLeft");
+// private static GeneratorMapping copyRight = GM("doSetup", "doCopyRight");
+// private static GeneratorMapping compare = GM("doSetup", "doCompare");
+// private static GeneratorMapping compareLeft= GM("doSetup", "doCompareNextLeftKey");
+//
+// private static final MappingSet SETUP_MAPPING = new MappingSet((String) null, null, setup, setup);
+// private static final MappingSet COPY_LEFT_MAPPING = new MappingSet("leftIndex", "outIndex", copyLeft, copyLeft);
+// private static final MappingSet COPY_RIGHT_MAPPING = new MappingSet("rightIndex", "outIndex", copyRight, copyRight);
+// private static final MappingSet COMPARE_MAPPING = new MappingSet("leftIndex", "rightIndex", compare, compare);
+// private static final MappingSet COMPARE_RIGHT_MAPPING = new MappingSet("rightIndex", null, compare, compare);
+// private static final MappingSet COMPARE_LEFT_MAPPING = new MappingSet("leftIndex", "null", compareLeft, compareLeft);
+// private static final MappingSet COMPARE_NEXT_LEFT_MAPPING = new MappingSet("nextLeftIndex", "null", compareLeft, compareLeft);
+//
+ public static final MappingSet SETUP_MAPPING =
+ new MappingSet("null", "null",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doSetup", null, null));
+ public static final MappingSet COPY_LEFT_MAPPING =
+ new MappingSet("leftIndex", "outIndex",
+ GM("doSetup", "doCopyLeft", null, null),
+ GM("doSetup", "doCopyLeft", null, null));
+ public static final MappingSet COPY_RIGHT_MAPPING =
+ new MappingSet("rightIndex", "outIndex",
+ GM("doSetup", "doCopyRight", null, null),
+ GM("doSetup", "doCopyRight", null, null));
+ public static final MappingSet COMPARE_MAPPING =
+ new MappingSet("leftIndex", "rightIndex",
+ GM("doSetup", "doCompare", null, null),
+ GM("doSetup", "doCompare", null, null));
+ public static final MappingSet COMPARE_RIGHT_MAPPING =
+ new MappingSet("rightIndex", "null",
+ GM("doSetup", "doCompare", null, null),
+ GM("doSetup", "doCompare", null, null));
+ public static final MappingSet COMPARE_LEFT_MAPPING =
+ new MappingSet("leftIndex", "null",
+ GM("doSetup", "doCompareNextLeftKey", null, null),
+ GM("doSetup", "doCompareNextLeftKey", null, null));
+ public static final MappingSet COMPARE_NEXT_LEFT_MAPPING =
+ new MappingSet("nextLeftIndex", "null",
+ GM("doSetup", "doCompareNextLeftKey", null, null),
+ GM("doSetup", "doCompareNextLeftKey", null, null));
+
+
private final RecordBatch left;
private final RecordBatch right;
private final JoinStatus status;
private JoinWorker worker;
+ private JoinCondition condition;
public MergeJoinBatchBuilder batchBuilder;
protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
@@ -35,11 +86,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
this.right = right;
this.status = new JoinStatus(left, right, this);
this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+ this.condition = popConfig.getConditions().get(0);
+ // currently only one join condition is supported
+ assert popConfig.getConditions().size() == 1;
}
@Override
public int getRecordCount() {
- return status.outputPosition;
+ return status.getOutPosition();
}
@Override
@@ -50,24 +104,31 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// loop so we can start over again if we find a new batch was created.
while(true){
-
+
+ // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
+ if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+ status.getOutcome() == JoinOutcome.SCHEMA_CHANGED)
+ allocateBatch();
+
+ // reset the output position to zero after our parent iterates this RecordBatch
+ if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+ status.getOutcome() == JoinOutcome.SCHEMA_CHANGED ||
+ status.getOutcome() == JoinOutcome.NO_MORE_DATA)
+ status.resetOutputPos();
+
boolean first = false;
if(worker == null){
try {
- this.worker = getNewWorker();
+ logger.debug("Creating New Worker");
+ this.worker = generateNewWorker();
first = true;
- } catch (ClassTransformationException | IOException e) {
+ } catch (ClassTransformationException | IOException | SchemaChangeException e) {
context.fail(new SchemaChangeException(e));
kill();
return IterOutcome.STOP;
}
}
- // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
- if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){
- allocateBatch();
- }
-
// join until we have a complete outgoing batch
worker.doJoin(status);
@@ -75,17 +136,19 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
switch(status.getOutcome()){
case BATCH_RETURNED:
// only return new schema if new worker has been setup.
+ logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
case FAILURE:
kill();
return IterOutcome.STOP;
case NO_MORE_DATA:
- return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE;
- case MODE_CHANGED:
+ logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? "OK" : "NONE"));
+ return status.getOutPosition() > 0 ? IterOutcome.OK: IterOutcome.NONE;
case SCHEMA_CHANGED:
worker = null;
- if(status.outputPosition > 0){
+ if(status.getOutPosition() > 0){
// if we have current data, let's return that.
+ logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
}else{
// loop again to rebuild worker.
@@ -113,23 +176,243 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
right.kill();
}
- private JoinWorker getNewWorker() throws ClassTransformationException, IOException{
- CodeGenerator<JoinWorker> cg = new CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
- // if (status.rightSourceMode)
- // generate copier which deref's SV4
- // else
- // generate direct copier.
-
- // generate comparator.
- // generate compareNextLeftKey.
+ final CodeGenerator<JoinWorker> cg = new CodeGenerator<>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ final ErrorCollector collector = new ErrorCollectorImpl();
+ final LogicalExpression leftFieldExpr = condition.getLeft();
+ final LogicalExpression rightFieldExpr = condition.getRight();
+
+ // Generate members and initialization code
+ /////////////////////////////////////////
+
+ // declare and assign JoinStatus member
+ cg.setMappingSet(SETUP_MAPPING);
+ JClass joinStatusClass = cg.getModel().ref(JoinStatus.class);
+ JVar joinStatus = cg.clazz.field(JMod.NONE, joinStatusClass, "status");
+ cg.getSetupBlock().assign(JExpr._this().ref(joinStatus), JExpr.direct("status"));
+
+ // declare and assign outgoing VectorContainer member
+ JClass vectorContainerClass = cg.getModel().ref(VectorContainer.class);
+ JVar outgoingVectorContainer = cg.clazz.field(JMod.NONE, vectorContainerClass, "outgoing");
+ cg.getSetupBlock().assign(JExpr._this().ref(outgoingVectorContainer), JExpr.direct("outgoing"));
+
+ // declare and assign incoming left RecordBatch member
+ JClass recordBatchClass = cg.getModel().ref(RecordBatch.class);
+ JVar incomingLeftRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingLeft");
+ cg.getSetupBlock().assign(JExpr._this().ref(incomingLeftRecordBatch), joinStatus.ref("left"));
+
+ // declare and assign incoming right RecordBatch member
+ JVar incomingRightRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingRight");
+ cg.getSetupBlock().assign(JExpr._this().ref(incomingRightRecordBatch), joinStatus.ref("right"));
+
+ // declare 'incoming' member so VVReadExpr generated code can point to the left or right batch
+ JVar incomingRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incoming");
+
+ // materialize value vector readers from join expression
+ final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector);
+ if (collector.hasErrors())
+ throw new ClassTransformationException(String.format(
+ "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
+
+ final LogicalExpression materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector);
+ if (collector.hasErrors())
+ throw new ClassTransformationException(String.format(
+ "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString()));
+
+
+ // generate compare()
+ ////////////////////////
+ cg.setMappingSet(COMPARE_MAPPING);
+ cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+ CodeGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+ cg.setMappingSet(COMPARE_RIGHT_MAPPING);
+ cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch));
+ CodeGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false);
+
+ if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) {
+ // handle null == null
+ cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+ .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+ ._then()
+ ._return(JExpr.lit(0));
+
+ // handle null == !null
+ cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+ .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+ ._then()
+ ._return(JExpr.lit(1));
+
+ } else if (compareLeftExprHolder.isOptional()) {
+ // handle null == required (null is less than any value)
+ cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
+ ._then()
+ ._return(JExpr.lit(-1));
+
+ } else if (compareRightExprHolder.isOptional()) {
+ // handle required == null (null is less than any value)
+ cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
+ ._then()
+ ._return(JExpr.lit(1));
+ }
+
+ // equality
+ cg.getEvalBlock()._if(compareLeftExprHolder.getValue().eq(compareRightExprHolder.getValue()))
+ ._then()
+ ._return(JExpr.lit(0));
+ // less than
+ cg.getEvalBlock()._if(compareLeftExprHolder.getValue().lt(compareRightExprHolder.getValue()))
+ ._then()
+ ._return(JExpr.lit(-1));
+ // greater than
+ cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+ // generate compareNextLeftKey()
+ ////////////////////////////////
+ cg.setMappingSet(COMPARE_LEFT_MAPPING);
+ cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+
+ // int nextLeftIndex = leftIndex + 1;
+ cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
+
+ // check if the next key is in this batch
+ cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
+ ._then()
+ ._return(JExpr.lit(-1));
+
+ // generate VV read expressions
+ CodeGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+ cg.setMappingSet(COMPARE_NEXT_LEFT_MAPPING); // change mapping from 'leftIndex' to 'nextLeftIndex'
+ CodeGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+
+ if (compareThisLeftExprHolder.isOptional()) {
+ // handle null == null
+ cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+ .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+ ._then()
+ ._return(JExpr.lit(0));
+
+ // handle null == !null
+ cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+ .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+ ._then()
+ ._return(JExpr.lit(1));
+ }
+
+ // check value equality
+ cg.getEvalBlock()._if(compareThisLeftExprHolder.getValue().eq(compareNextLeftExprHolder.getValue()))
+ ._then()
+ ._return(JExpr.lit(0));
+
+ // no match if reached
+ cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+ // generate copyLeft()
+ //////////////////////
+ cg.setMappingSet(COPY_LEFT_MAPPING);
+ int vectorId = 0;
+ for (VectorWrapper<?> vw : left) {
+ JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
+ new TypedFieldId(vw.getField().getType(), vectorId));
+ JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+ new TypedFieldId(vw.getField().getType(),vectorId, true));
+ // todo: check for room in vvOut
+ cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+ .arg(COPY_LEFT_MAPPING.getValueReadIndex())
+ .arg(COPY_LEFT_MAPPING.getValueWriteIndex())
+ .arg(vvIn));
+ ++vectorId;
+ }
+ cg.getEvalBlock()._return(JExpr.lit(true));
+
+ // generate copyRight()
+ ///////////////////////
+ cg.setMappingSet(COPY_RIGHT_MAPPING);
+
+ int rightVectorBase = vectorId;
+ for (VectorWrapper<?> vw : right) {
+ JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
+ new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
+ JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+ new TypedFieldId(vw.getField().getType(),vectorId, true));
+ cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+ .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+ .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+ .arg(vvIn));
+ ++vectorId;
+ }
+ cg.getEvalBlock()._return(JExpr.lit(true));
JoinWorker w = context.getImplementationClass(cg);
- w.setupJoin(status, this.container);
+ w.setupJoin(context, status, this.container);
return w;
}
- private void allocateBatch(){
+ private void allocateBatch() {
// allocate new batch space.
+ container.clear();
+
+ // add fields from both batches
+ for (VectorWrapper<?> w : left) {
+ ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+ getAllocator(w.getValueVector(), outgoingVector).alloc(left.getRecordCount() * 4);
+ container.add(outgoingVector);
+ }
+
+ for (VectorWrapper<?> w : right) {
+ ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+ getAllocator(w.getValueVector(), outgoingVector).alloc(right.getRecordCount() * 4);
+ container.add(outgoingVector);
+ }
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ logger.debug("Built joined schema: {}", container.getSchema());
+ }
+
+ private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
+ if(outgoing instanceof FixedWidthVector){
+ return new FixedVectorAllocator((FixedWidthVector) outgoing);
+ }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
+ return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
+ }else{
+ throw new UnsupportedOperationException();
+ }
}
+
+ private class FixedVectorAllocator implements VectorAllocator{
+ FixedWidthVector out;
+
+ public FixedVectorAllocator(FixedWidthVector out) {
+ super();
+ this.out = out;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
+ }
+
+ private class VariableVectorAllocator implements VectorAllocator{
+ VariableWidthVector in;
+ VariableWidthVector out;
+
+ public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
+ super();
+ this.in = in;
+ this.out = out;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(in.getByteCapacity(), recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
+ }
+
+ public interface VectorAllocator{
+ public void alloc(int recordCount);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index d75cfb9..85ca43d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -40,19 +40,20 @@ public class MergeJoinBatchBuilder {
private JoinStatus status;
public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+ this.container = new VectorContainer();
this.status = status;
this.svAllocator = context.getAllocator().getPreAllocator();
}
public boolean add(RecordBatch batch) {
if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
- throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
// resource checks
long batchBytes = getSize(batch);
if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary
- if (runningBatches + 1 > Character.MAX_VALUE) return false; // allowed in batch.
+ if (runningBatches++ >= Character.MAX_VALUE) return false; // allowed in batch.
if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
// transfer VVs to a new RecordBatchData
@@ -73,7 +74,6 @@ public class MergeJoinBatchBuilder {
public void build() throws SchemaChangeException {
container.clear();
-// if (queuedRightBatches.keySet().size() > 1) throw new SchemaChangeException("Join currently only supports a single schema.");
if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
BatchSchema schema = queuedRightBatches.keySet().iterator().next();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
new file mode 100644
index 0000000..1b65227
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 2);
+ return new MergeJoinBatch(config, context, children.get(0), children.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
new file mode 100644
index 0000000..38b8225
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -0,0 +1,220 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+
+public class TestMergeJoin {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
+
+ DrillConfig c = DrillConfig.create();
+
+ @Test
+ public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int totalRecordCount = 0;
+ while (exec.next()) {
+ totalRecordCount += exec.getRecordCount();
+ for (ValueVector v : exec)
+ System.out.print("[" + v.getField().getName() + "] ");
+ System.out.println("\n");
+ for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+ List<Object> row = new ArrayList();
+ for (ValueVector v : exec) {
+ row.add(v.getAccessor().getObject(valueIdx));
+ }
+ for (Object cell : row) {
+ if (cell == null) {
+ System.out.print("<null> ");
+ continue;
+ }
+ int len = cell.toString().length();
+ System.out.print(cell);
+ for (int i = 0; i < (14 - len); ++i)
+ System.out.print(" ");
+ }
+ System.out.println();
+ }
+ System.out.println();
+ }
+ assertEquals(100, totalRecordCount);
+ System.out.println("Total Record Count: " + totalRecordCount);
+ if (context.getFailureCause() != null)
+ throw context.getFailureCause();
+ assertTrue(!context.isFailed());
+
+ }
+
+ @Test
+ public void orderedEqualityJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(
+ Files.toString(
+ FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)
+ .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
+ .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int totalRecordCount = 0;
+ while (exec.next()) {
+ totalRecordCount += exec.getRecordCount();
+ System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+ System.out.println(" t1 t2");
+
+ for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+ List<Object> row = Lists.newArrayList();
+ for (ValueVector v : exec)
+ row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+ for (Object cell : row) {
+ if (cell == null) {
+ System.out.print("<null> ");
+ continue;
+ }
+ int len = cell.toString().length();
+ System.out.print(cell + " ");
+ for (int i = 0; i < (10 - len); ++i)
+ System.out.print(" ");
+ }
+ System.out.println();
+ }
+ }
+ System.out.println("Total Record Count: " + totalRecordCount);
+ assertEquals(25, totalRecordCount);
+
+ if (context.getFailureCause() != null)
+ throw context.getFailureCause();
+ assertTrue(!context.isFailed());
+
+ }
+
+
+ @Test
+ public void orderedEqualityMultiBatchJoin(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getConfig(); result = c;
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
+ PhysicalPlan plan = reader.readPhysicalPlan(
+ Files.toString(
+ FileUtils.getResourceAsFile("/join/merge_multi_batch.json"), Charsets.UTF_8)
+ .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
+ .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int totalRecordCount = 0;
+ while (exec.next()) {
+ totalRecordCount += exec.getRecordCount();
+ System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+
+ for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+ List<Object> row = Lists.newArrayList();
+ for (ValueVector v : exec)
+ row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+ for (Object cell : row) {
+ if (cell == null) {
+ System.out.print("<null> ");
+ continue;
+ }
+ int len = cell.toString().length();
+ System.out.print(cell + " ");
+ for (int i = 0; i < (10 - len); ++i)
+ System.out.print(" ");
+ }
+ System.out.println();
+ }
+ }
+ System.out.println("Total Record Count: " + totalRecordCount);
+ assertEquals(25, totalRecordCount);
+
+ if (context.getFailureCause() != null)
+ throw context.getFailureCause();
+ assertTrue(!context.isFailed());
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
new file mode 100644
index 0000000..e6a92bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
@@ -0,0 +1,52 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://source1.apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ pop:"mock-sub-scan",
+ url: "http://source2.apache.org",
+ entries:[
+ {records: 50, types: [
+ {name: "blue1", type: "INT", mode: "REQUIRED"},
+ {name: "red1", type: "INT", mode: "REQUIRED"},
+ {name: "green1", type: "INT", mode: "REQUIRED"}
+ ]},
+ {records: 50, types: [
+ {name: "blue1", type: "INT", mode: "REQUIRED"},
+ {name: "red1", type: "INT", mode: "REQUIRED"},
+ {name: "green1", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 3,
+ right: 1,
+ left: 2,
+ pop: "merge-join",
+ join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
new file mode 100644
index 0000000..ebdfd38
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
@@ -0,0 +1,47 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"json-sub-scan",
+ readEntries:[
+ {path: "#{LEFT_FILE}"}
+ ],
+ engineConfig:{
+ "type":"json",
+ "dfsName" : "file:///"
+ }
+
+ },
+ {
+ @id:2,
+ pop:"json-sub-scan",
+ readEntries:[
+ {path: "#{RIGHT_FILE}"}
+ ],
+ engineConfig:{
+ "type":"json",
+ "dfsName" : "file:///"
+ }
+
+ },
+ {
+ @id: 3,
+ left: 1,
+ right: 2,
+ pop: "merge-join",
+ join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
new file mode 100644
index 0000000..8cf4640
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100, "leftbatch0": 0}
+{"a":1, "b":200, "leftbatch0": 0}
+{"a":1, "b":300, "leftbatch0": 0}
+{"a":2, "b":400, "leftbatch0": 0}
+{"a":2, "b":500, "leftbatch0": 0}
+{"a":2, "b":600, "leftbatch1": 1}
+{"a":3, "b":700, "leftbatch1": 1}
+{"a":4, "b":800, "leftbatch1": 1}
+{"a":5, "b":900, "leftbatch1": 1}
+{"a":6, "b":1000, "leftbatch1": 1}
+{"a":7, "b":1100, "leftbatch1": 1}
+{"a":8, "b":1200, "leftbatch1": 1}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
new file mode 100644
index 0000000..30aa62d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1, "rightbatch0": 0}
+{"bb":20000, "aa":2, "rightbatch0": 0}
+{"bb":30000, "aa":2, "rightbatch0": 0}
+{"bb":40000, "aa":2, "rightbatch0": 0}
+{"bb":50000, "aa":2, "rightbatch1": 1}
+{"bb":60000, "aa":2, "rightbatch1": 1}
+{"bb":70000, "aa":3, "rightbatch1": 1}
+{"bb":80000, "aa":4, "rightbatch2": 2}
+{"bb":90000, "aa":6, "rightbatch3": 3}
+{"bb":100000, "aa":7, "rightbatch4": 4}
+{"bb":110000, "aa":7, "rightbatch5": 5}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
new file mode 100644
index 0000000..0e4f79d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
@@ -0,0 +1,37 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"json-sub-scan",
+ entries:[
+ {url: "#{LEFT_FILE}"}
+ ]
+ },
+ {
+ @id:2,
+ pop:"json-sub-scan",
+ entries:[
+ {url: "#{RIGHT_FILE}"}
+ ]
+ },
+ {
+ @id: 3,
+ left: 1,
+ right: 2,
+ pop: "merge-join",
+ join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
new file mode 100644
index 0000000..e7bab08
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100}
+{"a":1, "b":200}
+{"a":1, "b":300}
+{"a":2, "b":400}
+{"a":2, "b":500}
+{"a":2, "b":600}
+{"a":3, "b":700}
+{"a":4, "b":800}
+{"a":5, "b":900}
+{"a":6, "b":1000}
+{"a":7, "b":1100}
+{"a":8, "b":1200}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
new file mode 100644
index 0000000..d99a145
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1}
+{"bb":20000, "aa":2}
+{"bb":30000, "aa":2}
+{"bb":40000, "aa":2}
+{"bb":50000, "aa":2}
+{"bb":60000, "aa":2}
+{"bb":70000, "aa":3}
+{"bb":80000, "aa":4}
+{"bb":90000, "aa":6}
+{"bb":100000, "aa":7}
+{"bb":110000, "aa":7}