You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/12/06 22:40:53 UTC

[drill] 01/04: DRILL-6882: Handle the cases where RowKeyJoin's left pipeline being called multiple times.

This is an automated email from the ASF dual-hosted git repository.

amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f3eef38fd633be28f961139b371f524f5172914f
Author: Hanumath Rao Maduri <hm...@maprtech.com>
AuthorDate: Sat Sep 30 13:26:40 2017 -0700

    DRILL-6882: Handle the cases where RowKeyJoin's left pipeline being called multiple times.
    
    close apache/drill#1562
---
 .../exec/physical/config/IteratorValidator.java    | 14 +++++++--
 .../exec/physical/impl/join/RowKeyJoinBatch.java   |  5 ++++
 .../validate/IteratorValidatorBatchIterator.java   | 20 +++++++++----
 .../impl/validate/IteratorValidatorCreator.java    |  2 +-
 .../impl/validate/IteratorValidatorInjector.java   | 33 +++++++++++++++++++---
 5 files changed, 62 insertions(+), 12 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
index 9fbef97..4f73b00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
@@ -23,10 +23,20 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
 
 public class IteratorValidator extends AbstractSingle{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class);
+  /* isRepeatable flag will be set to true if this validator is created by a Repeatable pipeline.
+   * In a repeatable pipeline some state transitions are valid i.e downstream operator
+   * can call the upstream operator even after receiving NONE.
+   */
+  public final boolean isRepeatable;
 
-  public IteratorValidator(PhysicalOperator child) {
+  public IteratorValidator(PhysicalOperator child, boolean repeatable) {
     super(child);
     setCost(child.getCost());
+    this.isRepeatable = repeatable;
+  }
+
+  public IteratorValidator(PhysicalOperator child) {
+    this(child, false);
   }
 
   @Override
@@ -36,7 +46,7 @@ public class IteratorValidator extends AbstractSingle{
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new IteratorValidator(child);
+    return new IteratorValidator(child, isRepeatable);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 941f321..2910da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -132,6 +132,11 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
         return IterOutcome.OK;
       }
 
+      if (rightUpstream == IterOutcome.NONE) {
+        rkJoinState = RowKeyJoinState.DONE;
+        state = BatchState.DONE;
+        return rightUpstream;
+      }
       rightUpstream = next(right);
 
       logger.debug("right input IterOutcome: {}", rightUpstream);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 1ea3895..5c70f5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -47,6 +47,9 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
   /** For logging/debuggability only. */
   private static volatile int instanceCount;
 
+  /** @see org.apache.drill.exec.physical.config.IteratorValidator */
+  private final boolean isRepeatable;
+
   /** For logging/debuggability only. */
   private final int instNum;
   {
@@ -102,12 +105,17 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
    */
   private boolean validateBatches;
 
-  public IteratorValidatorBatchIterator(RecordBatch incoming) {
+  public IteratorValidatorBatchIterator(RecordBatch incoming, boolean isRepeatable) {
     this.incoming = incoming;
     batchTypeName = incoming.getClass().getSimpleName();
+    this.isRepeatable = isRepeatable;
 
     // (Log construction and close() at same level to bracket instance's activity.)
-    logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName);
+    logger.trace( "[#{}; on {}; repeatable: {}]: Being constructed.", instNum, batchTypeName, isRepeatable);
+  }
+
+  public IteratorValidatorBatchIterator(RecordBatch incoming) {
+    this(incoming, false);
   }
 
 
@@ -217,7 +225,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
                 instNum, batchTypeName, exceptionState, batchState));
       }
       // (Note:  This could use validationState.)
-      if (batchState == NONE || batchState == STOP) {
+      if ((!isRepeatable && batchState == NONE) || batchState == STOP) {
         throw new IllegalStateException(
             String.format(
                 "next() [on #%d, %s] called again after it returned %s."
@@ -256,8 +264,10 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
         case NONE:
           // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
           // FAST NONE.
-          // NONE moves to terminal high-level state.
-          validationState = ValidationState.TERMINAL;
+          // NONE moves to TERMINAL high-level state if NOT repeatable.
+          if (!isRepeatable) {
+            validationState = ValidationState.TERMINAL;
+          }
           break;
         case STOP:
           // STOP is allowed at any time, except if already terminated (checked
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 4dc58e5..b7be8ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -37,7 +37,7 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     RecordBatch child = children.iterator().next();
-    IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child);
+    IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child, config.isRepeatable);
     boolean validateBatches = context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
                               context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION);
     iter.enableBatchValidation(validateBatches);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index 20eba16..6d86fb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
@@ -32,6 +33,17 @@ public class IteratorValidatorInjector extends
     AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class);
 
+  /* This flag when set creates all the validators as repeatable validators */
+  private final boolean isRepeatablePipeline;
+
+  public IteratorValidatorInjector() {
+    this(false);
+  }
+
+  public IteratorValidatorInjector(boolean repeatablePipeline) {
+    this.isRepeatablePipeline = repeatablePipeline;
+  }
+
   public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     IteratorValidatorInjector inject = new IteratorValidatorInjector();
     PhysicalOperator newOp = root.accept(inject, context);
@@ -60,11 +72,24 @@ public class IteratorValidatorInjector extends
     List<PhysicalOperator> newChildren = Lists.newArrayList();
     PhysicalOperator newOp = op;
 
+    if (op instanceof RowKeyJoinPOP) {
+      /* create a RepeatablePipeline for the left side of RowKeyJoin */
+      PhysicalOperator left = new IteratorValidator(((RowKeyJoinPOP) op).getLeft()
+                                   .accept(new IteratorValidatorInjector(true), context), true);
+      left.setOperatorId(op.getOperatorId() + 1000);
+      newChildren.add(left);
+      /* right pipeline is not repeatable pipeline */
+      PhysicalOperator right = new IteratorValidator(((RowKeyJoinPOP) op).getRight()
+              .accept(this, context));
+      right.setOperatorId(op.getOperatorId() + 1000);
+      newChildren.add(right);
+    } else {
     /* Get the list of child operators */
-    for (PhysicalOperator child : op) {
-      PhysicalOperator validator = new IteratorValidator(child.accept(this, context));
-      validator.setOperatorId(op.getOperatorId() + 1000);
-      newChildren.add(validator);
+      for (PhysicalOperator child : op) {
+        PhysicalOperator validator = new IteratorValidator(child.accept(this, context), this.isRepeatablePipeline);
+        validator.setOperatorId(op.getOperatorId() + 1000);
+        newChildren.add(validator);
+      }
     }
 
     /* Inject trace operator */