You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:55 UTC
[08/10] git commit: DRILL-301: Join two tables hit
IndexOutOfBoundsException
DRILL-301: Join two tables hit IndexOutOfBoundsException
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/316ce8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/316ce8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/316ce8a6
Branch: refs/heads/master
Commit: 316ce8a6f8f94c31574e7107be26addcfc92dc7f
Parents: b12c0b1
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:07:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:07:45 2013 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/JoinTemplate.java | 28 +++++++++++---------
.../exec/physical/impl/join/JoinWorker.java | 2 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 20 ++++++++------
3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index b7fdbf3..aae1a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -83,8 +83,9 @@ public abstract class JoinTemplate implements JoinWorker {
/**
* Copy rows from the input record batches until the output record batch is full
* @param status State of the join operation (persists across multiple record batches/schema changes)
+ * @return true of join succeeded; false if the worker needs to be regenerated
*/
- public final void doJoin(final JoinStatus status) {
+ public final boolean doJoin(final JoinStatus status) {
while (true) {
// for each record
@@ -93,14 +94,15 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
- doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+ if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
+ return false;
status.advanceLeft();
}
}
- return;
+ return true;
}
if (!status.isLeftPositionAllowed())
- return;
+ return true;
int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
switch (comparison) {
@@ -108,7 +110,9 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
- doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+ if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
+ return false;
+ }
status.advanceLeft();
continue;
@@ -133,10 +137,10 @@ public abstract class JoinTemplate implements JoinWorker {
do {
// copy all equal right keys to the output record batch
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
- return;
+ return false;
if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
- return;
+ return false;
// If the left key has duplicates and we're about to cross a boundary in the right batch, add the
// right table's record batch to the sv4 builder before calling next. These records will need to be
@@ -167,7 +171,7 @@ public abstract class JoinTemplate implements JoinWorker {
status.ok = false;
}
// return to indicate recompile in right-sv4 mode
- return;
+ return true;
}
continue;
@@ -193,8 +197,8 @@ public abstract class JoinTemplate implements JoinWorker {
/**
* Copy the data to the new record batch (if it fits).
*
- * @param leftPosition position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
- * @param outputPosition position of the output record batch
+ * @param leftIndex position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
+ * @param outIndex position of the output record batch
* @return Whether or not the data was copied.
*/
public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
@@ -205,8 +209,8 @@ public abstract class JoinTemplate implements JoinWorker {
* Compare the values of the left and right join key to determine whether the left is less than, greater than
* or equal to the right.
*
- * @param leftPosition
- * @param rightPosition
+ * @param leftIndex
+ * @param rightIndex
* @return 0 if both keys are equal
* -1 if left is < right
* 1 if left is > right
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 4374cef..8643d66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -30,7 +30,7 @@ public interface JoinWorker {
}
public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
- public void doJoin(JoinStatus status);
+ public boolean doJoin(JoinStatus status);
public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1e20e91..f3a32cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -166,7 +166,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
// join until we have a complete outgoing batch
- worker.doJoin(status);
+ if (!worker.doJoin(status))
+ worker = null;
// get the outcome of the join.
switch(status.getOutcome()){
@@ -353,10 +354,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(vw.getField().getType(),vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
+ cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
.arg(COPY_LEFT_MAPPING.getValueReadIndex())
.arg(COPY_LEFT_MAPPING.getValueWriteIndex())
- .arg(vvIn));
+ .arg(vvIn).eq(JExpr.FALSE))
+ ._then()
+ ._return(JExpr.FALSE);
++vectorId;
}
cg.getEvalBlock()._return(JExpr.lit(true));
@@ -372,10 +375,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(vw.getField().getType(),vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
- .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
- .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
- .arg(vvIn));
+ cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+ .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+ .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+ .arg(vvIn).eq(JExpr.FALSE))
+ ._then()
+ ._return(JExpr.FALSE);
++vectorId;
}
cg.getEvalBlock()._return(JExpr.lit(true));
@@ -388,7 +393,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private void allocateBatch() {
// allocate new batch space.
container.clear();
-
// add fields from both batches
for (VectorWrapper<?> w : left) {
ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());