You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/12/06 22:40:53 UTC
[drill] 01/04: DRILL-6882: Handle the cases where RowKeyJoin's left
pipeline being called multiple times.
This is an automated email from the ASF dual-hosted git repository.
amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit f3eef38fd633be28f961139b371f524f5172914f
Author: Hanumath Rao Maduri <hm...@maprtech.com>
AuthorDate: Sat Sep 30 13:26:40 2017 -0700
DRILL-6882: Handle the cases where RowKeyJoin's left pipeline being called multiple times.
close apache/drill#1562
---
.../exec/physical/config/IteratorValidator.java | 14 +++++++--
.../exec/physical/impl/join/RowKeyJoinBatch.java | 5 ++++
.../validate/IteratorValidatorBatchIterator.java | 20 +++++++++----
.../impl/validate/IteratorValidatorCreator.java | 2 +-
.../impl/validate/IteratorValidatorInjector.java | 33 +++++++++++++++++++---
5 files changed, 62 insertions(+), 12 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
index 9fbef97..4f73b00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
@@ -23,10 +23,20 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
public class IteratorValidator extends AbstractSingle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class);
+ /* isRepeatable flag will be set to true if this validator is created by a Repeatable pipeline.
+ * In a repeatable pipeline some state transitions are valid i.e downstream operator
+ * can call the upstream operator even after receiving NONE.
+ */
+ public final boolean isRepeatable;
- public IteratorValidator(PhysicalOperator child) {
+ public IteratorValidator(PhysicalOperator child, boolean repeatable) {
super(child);
setCost(child.getCost());
+ this.isRepeatable = repeatable;
+ }
+
+ public IteratorValidator(PhysicalOperator child) {
+ this(child, false);
}
@Override
@@ -36,7 +46,7 @@ public class IteratorValidator extends AbstractSingle{
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new IteratorValidator(child);
+ return new IteratorValidator(child, isRepeatable);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 941f321..2910da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -132,6 +132,11 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
return IterOutcome.OK;
}
+ if (rightUpstream == IterOutcome.NONE) {
+ rkJoinState = RowKeyJoinState.DONE;
+ state = BatchState.DONE;
+ return rightUpstream;
+ }
rightUpstream = next(right);
logger.debug("right input IterOutcome: {}", rightUpstream);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 1ea3895..5c70f5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -47,6 +47,9 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
/** For logging/debuggability only. */
private static volatile int instanceCount;
+ /** @see org.apache.drill.exec.physical.config.IteratorValidator */
+ private final boolean isRepeatable;
+
/** For logging/debuggability only. */
private final int instNum;
{
@@ -102,12 +105,17 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
*/
private boolean validateBatches;
- public IteratorValidatorBatchIterator(RecordBatch incoming) {
+ public IteratorValidatorBatchIterator(RecordBatch incoming, boolean isRepeatable) {
this.incoming = incoming;
batchTypeName = incoming.getClass().getSimpleName();
+ this.isRepeatable = isRepeatable;
// (Log construction and close() at same level to bracket instance's activity.)
- logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName);
+ logger.trace( "[#{}; on {}; repeatable: {}]: Being constructed.", instNum, batchTypeName, isRepeatable);
+ }
+
+ public IteratorValidatorBatchIterator(RecordBatch incoming) {
+ this(incoming, false);
}
@@ -217,7 +225,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
instNum, batchTypeName, exceptionState, batchState));
}
// (Note: This could use validationState.)
- if (batchState == NONE || batchState == STOP) {
+ if ((!isRepeatable && batchState == NONE) || batchState == STOP) {
throw new IllegalStateException(
String.format(
"next() [on #%d, %s] called again after it returned %s."
@@ -256,8 +264,10 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
case NONE:
// NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
// FAST NONE.
- // NONE moves to terminal high-level state.
- validationState = ValidationState.TERMINAL;
+ // NONE moves to TERMINAL high-level state if NOT repeatable.
+ if (!isRepeatable) {
+ validationState = ValidationState.TERMINAL;
+ }
break;
case STOP:
// STOP is allowed at any time, except if already terminated (checked
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 4dc58e5..b7be8ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -37,7 +37,7 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
RecordBatch child = children.iterator().next();
- IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child);
+ IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child, config.isRepeatable);
boolean validateBatches = context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION);
iter.enableBatchValidation(validateBatches);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index 20eba16..6d86fb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -32,6 +33,17 @@ public class IteratorValidatorInjector extends
AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class);
+ /* This flag when set creates all the validators as repeatable validators */
+ private final boolean isRepeatablePipeline;
+
+ public IteratorValidatorInjector() {
+ this(false);
+ }
+
+ public IteratorValidatorInjector(boolean repeatablePipeline) {
+ this.isRepeatablePipeline = repeatablePipeline;
+ }
+
public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
IteratorValidatorInjector inject = new IteratorValidatorInjector();
PhysicalOperator newOp = root.accept(inject, context);
@@ -60,11 +72,24 @@ public class IteratorValidatorInjector extends
List<PhysicalOperator> newChildren = Lists.newArrayList();
PhysicalOperator newOp = op;
+ if (op instanceof RowKeyJoinPOP) {
+ /* create a RepeatablePipeline for the left side of RowKeyJoin */
+ PhysicalOperator left = new IteratorValidator(((RowKeyJoinPOP) op).getLeft()
+ .accept(new IteratorValidatorInjector(true), context), true);
+ left.setOperatorId(op.getOperatorId() + 1000);
+ newChildren.add(left);
+ /* right pipeline is not repeatable pipeline */
+ PhysicalOperator right = new IteratorValidator(((RowKeyJoinPOP) op).getRight()
+ .accept(this, context));
+ right.setOperatorId(op.getOperatorId() + 1000);
+ newChildren.add(right);
+ } else {
/* Get the list of child operators */
- for (PhysicalOperator child : op) {
- PhysicalOperator validator = new IteratorValidator(child.accept(this, context));
- validator.setOperatorId(op.getOperatorId() + 1000);
- newChildren.add(validator);
+ for (PhysicalOperator child : op) {
+ PhysicalOperator validator = new IteratorValidator(child.accept(this, context), this.isRepeatablePipeline);
+ validator.setOperatorId(op.getOperatorId() + 1000);
+ newChildren.add(validator);
+ }
}
/* Inject trace operator */