You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:55 UTC

[08/10] git commit: DRILL-301: Join two tables hit IndexOutOfBoundsException

DRILL-301: Join two tables hit IndexOutOfBoundsException


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/316ce8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/316ce8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/316ce8a6

Branch: refs/heads/master
Commit: 316ce8a6f8f94c31574e7107be26addcfc92dc7f
Parents: b12c0b1
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:07:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:07:45 2013 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinTemplate.java   | 28 +++++++++++---------
 .../exec/physical/impl/join/JoinWorker.java     |  2 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 20 ++++++++------
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index b7fdbf3..aae1a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -83,8 +83,9 @@ public abstract class JoinTemplate implements JoinWorker {
   /**
    * Copy rows from the input record batches until the output record batch is full
    * @param status  State of the join operation (persists across multiple record batches/schema changes)
+   * @return  true of join succeeded; false if the worker needs to be regenerated
    */
-  public final void doJoin(final JoinStatus status) {
+  public final boolean doJoin(final JoinStatus status) {
     while (true) {
       // for each record
 
@@ -93,14 +94,15 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining values from the left batch
           while (status.isLeftPositionAllowed()) {
-            doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+            if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
+              return false;
             status.advanceLeft();
           }
         }
-        return;
+        return true;
       }
       if (!status.isLeftPositionAllowed())
-        return;
+        return true;
 
       int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
       switch (comparison) {
@@ -108,7 +110,9 @@ public abstract class JoinTemplate implements JoinWorker {
       case -1:
         // left key < right key
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
-          doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+          if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
+            return false;
+          }
         status.advanceLeft();
         continue;
 
@@ -133,10 +137,10 @@ public abstract class JoinTemplate implements JoinWorker {
         do {
           // copy all equal right keys to the output record batch
           if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
-            return;
+            return false;
 
           if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
-            return;
+            return false;
           
           // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
           // right table's record batch to the sv4 builder before calling next.  These records will need to be
@@ -167,7 +171,7 @@ public abstract class JoinTemplate implements JoinWorker {
             status.ok = false;
           }
           // return to indicate recompile in right-sv4 mode
-          return;
+          return true;
         }
 
         continue;
@@ -193,8 +197,8 @@ public abstract class JoinTemplate implements JoinWorker {
   /**
    * Copy the data to the new record batch (if it fits).
    *
-   * @param leftPosition  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
-   * @param outputPosition position of the output record batch
+   * @param leftIndex  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
+   * @param outIndex position of the output record batch
    * @return Whether or not the data was copied.
    */
   public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
@@ -205,8 +209,8 @@ public abstract class JoinTemplate implements JoinWorker {
    * Compare the values of the left and right join key to determine whether the left is less than, greater than
    * or equal to the right.
    *
-   * @param leftPosition
-   * @param rightPosition
+   * @param leftIndex
+   * @param rightIndex
    * @return  0 if both keys are equal
    *         -1 if left is < right
    *          1 if left is > right

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 4374cef..8643d66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -30,7 +30,7 @@ public interface JoinWorker {
   }
 
   public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
-  public void doJoin(JoinStatus status);
+  public boolean doJoin(JoinStatus status);
   
   public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1e20e91..f3a32cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -166,7 +166,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       }
 
       // join until we have a complete outgoing batch
-      worker.doJoin(status);
+      if (!worker.doJoin(status))
+        worker = null;
 
       // get the outcome of the join.
       switch(status.getOutcome()){
@@ -353,10 +354,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                        new TypedFieldId(vw.getField().getType(),vectorId));
       // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
+      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
                                    .arg(COPY_LEFT_MAPPING.getValueReadIndex())
                                    .arg(COPY_LEFT_MAPPING.getValueWriteIndex())
-                                   .arg(vvIn));
+                                   .arg(vvIn).eq(JExpr.FALSE))
+          ._then()
+          ._return(JExpr.FALSE);
       ++vectorId;
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
@@ -372,10 +375,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                        new TypedFieldId(vw.getField().getType(),vectorId));
       // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
-          .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
-          .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
-          .arg(vvIn));
+      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+                                 .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+                                 .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+                                 .arg(vvIn).eq(JExpr.FALSE))
+          ._then()
+          ._return(JExpr.FALSE);
       ++vectorId;
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
@@ -388,7 +393,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private void allocateBatch() {
     // allocate new batch space.
     container.clear();
-
     // add fields from both batches
     for (VectorWrapper<?> w : left) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());