You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:54:54 UTC
[04/50] [abbrv] asterixdb git commit: working mega overlapping plan.
working mega overlapping plan.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0f533e8b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0f533e8b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0f533e8b
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 0f533e8bbbac7cf633d87013d71370114970c959
Parents: aea7fe8
Author: Preston Carman <pr...@apache.org>
Authored: Tue Jul 5 17:56:26 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Jul 5 17:56:26 2016 -0700
----------------------------------------------------------------------
.../IntervalLocalRangeSplitterOperator.java | 2 +-
.../IntervalLocalRangeOperatorDescriptor.java | 13 +-
.../IntervalLocalRangeSplitterPOperator.java | 38 ++--
.../asterix/optimizer/base/RuleCollections.java | 1 +
.../rules/IntervalSplitPartitioningRule.java | 201 ++++++++++++-------
.../physical/AbstractExchangePOperator.java | 1 +
.../physical/OneToOneExchangePOperator.java | 5 +-
.../algebricks/core/jobgen/impl/JobBuilder.java | 28 +--
8 files changed, 173 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
index 9ae9f7d..181e0fa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
@@ -54,7 +54,7 @@ public class IntervalLocalRangeSplitterOperator extends AbstractExtensibleLogica
@Override
public String toString() {
- return "IntervalLocalRangeSplitterOperator";
+ return "IntervalLocalRangeSplitterOperator " + joinKeyLogicalVars;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index 392bf43..ca44f78 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -45,20 +45,21 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
private static final long serialVersionUID = 1L;
private static final int PARTITION_ACTIVITY_ID = 0;
+ private static final int OUTPUT_ARITY = 3;
+
private static final int INPUT_STARTS = 0;
private static final int INPUT_COVERS = 2;
private static final int INPUT_ENDS = 1;
-// private static final int INPUT_STARTS = 0;
-// private static final int INPUT_COVERS = 0;
-// private static final int INPUT_ENDS = 0;
-
private final int key;
private final IRangeMap rangeMap;
public IntervalLocalRangeOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys,
RecordDescriptor recordDescriptor, IRangeMap rangeMap) {
- super(spec, 1, 3);
+ super(spec, 1, OUTPUT_ARITY);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = recordDescriptor;
+ }
key = keys[0];
this.rangeMap = rangeMap;
}
@@ -105,7 +106,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
@Override
public void flush() throws HyracksDataException {
for (int i = 0; i < getOutputArity(); i++) {
- resultAppender[i].flush(writers[i]);
+ FrameUtils.flushFrame(resultAppender[i].getBuffer(), writers[i]);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
index 1150b91..ced06aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
@@ -21,6 +21,7 @@ package org.apache.asterix.algebra.operators.physical;
import java.util.List;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -28,6 +29,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -55,9 +57,13 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato
}
@Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ public String toString() {
+ return "IntervalLocalRangeSplitterPOperator " + intervalFields;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
}
@Override
@@ -67,35 +73,37 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato
}
@Override
- public String toString() {
- return "IntervalLocalRangeSplitterPOperator " + intervalFields;
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
}
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]);
-
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
- context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
- IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recordDescriptor, rangeMap);
+ int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]);
+
+ IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recDescriptor, rangeMap);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
- // and contribute one edge from its child
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
@Override
- public boolean isMicroOperator() {
- return false;
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ ReplicateOperator rop = (ReplicateOperator) op;
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels = new int[rop.getOutputArity()]; // filled with 0's
+ return new Pair<>(inputDependencyLabels, outputDependencyLabels);
}
@Override
public boolean expensiveThanMaterialization() {
return false;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 5fcfc94..9a3a125 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -342,4 +342,5 @@ public final class RuleCollections {
prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
return prepareForJobGenRewrites;
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index a6f49f9..2772e68 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -51,12 +51,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogi
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
@@ -122,17 +124,19 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
ILogicalOperator op = opRef.getValue();
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
if (!isIntervalJoin(op)) {
return false;
}
- InnerJoinOperator startsJoin = (InnerJoinOperator) op;
- ExecutionMode mode = startsJoin.getExecutionMode();
- Mutable<ILogicalOperator> startsJoinRef = opRef;
+ InnerJoinOperator originalIntervalJoin = (InnerJoinOperator) op;
+ ExecutionMode mode = originalIntervalJoin.getExecutionMode();
Set<LogicalVariable> localLiveVars = new ListSet<>();
- VariableUtilities.getLiveVariables(op, localLiveVars);
+ VariableUtilities.getLiveVariables(originalIntervalJoin, localLiveVars);
- Mutable<ILogicalOperator> leftSortedInput = op.getInputs().get(0);
- Mutable<ILogicalOperator> rightSortedInput = op.getInputs().get(1);
+ Mutable<ILogicalOperator> leftSortedInput = originalIntervalJoin.getInputs().get(0);
+ Mutable<ILogicalOperator> rightSortedInput = originalIntervalJoin.getInputs().get(1);
if (leftSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE
&& rightSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
return false;
@@ -160,14 +164,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
// TODO check physical join
// Interval local partition operators
- LogicalVariable leftJoinKey = getJoinKey(startsJoin.getCondition().getValue(), LEFT);
- LogicalVariable rightJoinKey = getJoinKey(startsJoin.getCondition().getValue(), RIGHT);
+ LogicalVariable leftJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), LEFT);
+ LogicalVariable rightJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), RIGHT);
if (leftJoinKey == null || rightJoinKey == null) {
return false;
}
- ILogicalOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, rightRangeMap, mode);
+ ReplicateOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, leftRangeMap, mode);
Mutable<ILogicalOperator> leftIntervalSplitRef = new MutableObject<>(leftIntervalSplit);
- ILogicalOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode);
+ ReplicateOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode);
Mutable<ILogicalOperator> rightIntervalSplitRef = new MutableObject<>(rightIntervalSplit);
// Replicate operators
@@ -177,20 +181,36 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
Mutable<ILogicalOperator> rightStartsSplitRef = new MutableObject<>(rightStartsSplit);
// Covers Join Operator
- ILogicalOperator leftCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode);
+ ILogicalOperator leftCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode);
Mutable<ILogicalOperator> leftCoversJoinRef = new MutableObject<>(leftCoversJoin);
- ILogicalOperator rightCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode);
+ ILogicalOperator rightCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode);
Mutable<ILogicalOperator> rightCoversJoinRef = new MutableObject<>(rightCoversJoin);
// Ends Join Operator
- ILogicalOperator leftEndsJoin = getIntervalJoin(startsJoin, context, mode);
- ILogicalOperator rightEndsJoin = getIntervalJoin(startsJoin, context, mode);
- if (leftEndsJoin == null || rightEndsJoin == null) {
+ ILogicalOperator startsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+ ILogicalOperator leftEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+ ILogicalOperator rightEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+ if (startsJoin == null || leftEndsJoin == null || rightEndsJoin == null) {
return false;
}
+ Mutable<ILogicalOperator> startsJoinRef = new MutableObject<>(startsJoin);
Mutable<ILogicalOperator> leftEndsJoinRef = new MutableObject<>(leftEndsJoin);
Mutable<ILogicalOperator> rightEndsJoinRef = new MutableObject<>(rightEndsJoin);
+ // Materialize Operator
+ ILogicalOperator leftMaterialize0 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> leftMaterialize0Ref = new MutableObject<>(leftMaterialize0);
+ ILogicalOperator leftMaterialize1 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> leftMaterialize1Ref = new MutableObject<>(leftMaterialize1);
+ ILogicalOperator leftMaterialize2 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> leftMaterialize2Ref = new MutableObject<>(leftMaterialize2);
+ ILogicalOperator rightMaterialize0 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> rightMaterialize0Ref = new MutableObject<>(rightMaterialize0);
+ ILogicalOperator rightMaterialize1 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> rightMaterialize1Ref = new MutableObject<>(rightMaterialize1);
+ ILogicalOperator rightMaterialize2 = getMaterializeOperator(mode);
+ Mutable<ILogicalOperator> rightMaterialize2Ref = new MutableObject<>(rightMaterialize2);
+
// Union All Operator
ILogicalOperator union1 = getUnionOperator(localLiveVars, mode);
Mutable<ILogicalOperator> union1Ref = new MutableObject<>(union1);
@@ -201,59 +221,84 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
ILogicalOperator union4 = getUnionOperator(localLiveVars, mode);
Mutable<ILogicalOperator> union4Ref = new MutableObject<>(union4);
+ // Remove old path
+ originalIntervalJoin.getInputs().clear();
+
// Connect main path
connectOperators(leftIntervalSplitRef, leftSortedInput, context);
- context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplit);
- connectOperators(leftStartsSplitRef, leftIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftStartsSplit);
+ context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplitRef.getValue());
+ connectOperators(leftMaterialize0Ref, leftIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(leftMaterialize0Ref.getValue());
+ connectOperators(leftMaterialize1Ref, leftIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(leftMaterialize1Ref.getValue());
+ connectOperators(leftMaterialize2Ref, leftIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(leftMaterialize2Ref.getValue());
+
+ connectOperators(leftStartsSplitRef, leftMaterialize0Ref, context);
+ context.computeAndSetTypeEnvironmentForOperator(leftStartsSplitRef.getValue());
+
connectOperators(rightIntervalSplitRef, rightSortedInput, context);
- context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplit);
- connectOperators(rightStartsSplitRef, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightStartsSplit);
- updateConnections(startsJoinRef, leftStartsSplitRef, context, LEFT);
- updateConnections(startsJoinRef, rightStartsSplitRef, context, RIGHT);
- context.computeAndSetTypeEnvironmentForOperator(startsJoin);
- leftStartsSplit.getOutputs().add(startsJoinRef);
- rightStartsSplit.getOutputs().add(startsJoinRef);
+ context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplitRef.getValue());
+ connectOperators(rightMaterialize0Ref, rightIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightMaterialize0Ref.getValue());
+ connectOperators(rightMaterialize1Ref, rightIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightMaterialize1Ref.getValue());
+ connectOperators(rightMaterialize2Ref, rightIntervalSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightMaterialize2Ref.getValue());
+
+ connectOperators(rightStartsSplitRef, rightMaterialize0Ref, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightStartsSplitRef.getValue());
+
+ // Connect left and right starts path
+ connectOperators(startsJoinRef, leftStartsSplitRef, context);
+ connectOperators(startsJoinRef, rightStartsSplitRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(startsJoinRef.getValue());
// Connect left ends path
- connectOperators(leftEndsJoinRef, leftIntervalSplitRef, context);
+ connectOperators(leftEndsJoinRef, leftMaterialize1Ref, context);
connectOperators(leftEndsJoinRef, rightStartsSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftEndsJoin);
- connectOperators(union1Ref, leftEndsJoinRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(leftEndsJoinRef.getValue());
connectOperators(union1Ref, startsJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union1);
- rightStartsSplit.getOutputs().add(leftEndsJoinRef);
+ connectOperators(union1Ref, leftEndsJoinRef, context);
+ context.computeAndSetTypeEnvironmentForOperator(union1Ref.getValue());
// Connect left covers path
- connectOperators(leftCoversJoinRef, leftIntervalSplitRef, context);
+ connectOperators(leftCoversJoinRef, leftMaterialize2Ref, context);
connectOperators(leftCoversJoinRef, rightStartsSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftCoversJoin);
+ context.computeAndSetTypeEnvironmentForOperator(leftCoversJoinRef.getValue());
connectOperators(union2Ref, union1Ref, context);
connectOperators(union2Ref, leftCoversJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union2);
- rightStartsSplit.getOutputs().add(leftCoversJoinRef);
+ context.computeAndSetTypeEnvironmentForOperator(union2Ref.getValue());
// Connect right ends path
connectOperators(rightEndsJoinRef, leftStartsSplitRef, context);
- connectOperators(rightEndsJoinRef, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightEndsJoin);
+ connectOperators(rightEndsJoinRef, rightMaterialize1Ref, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightEndsJoinRef.getValue());
connectOperators(union3Ref, union2Ref, context);
connectOperators(union3Ref, rightEndsJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union3);
- leftStartsSplit.getOutputs().add(rightEndsJoinRef);
+ context.computeAndSetTypeEnvironmentForOperator(union3Ref.getValue());
// Connect right covers path
connectOperators(rightCoversJoinRef, leftStartsSplitRef, context);
- connectOperators(rightCoversJoinRef, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightCoversJoin);
+ connectOperators(rightCoversJoinRef, rightMaterialize2Ref, context);
+ context.computeAndSetTypeEnvironmentForOperator(rightCoversJoinRef.getValue());
connectOperators(union4Ref, union3Ref, context);
connectOperators(union4Ref, rightCoversJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union4);
- leftStartsSplit.getOutputs().add(rightCoversJoinRef);
+ context.computeAndSetTypeEnvironmentForOperator(union4Ref.getValue());
// Update context
- opRef.setValue(union4);
+ opRef.setValue(union4Ref.getValue());
+
+ context.addToDontApplySet(this, startsJoin);
+ context.addToDontApplySet(this, leftCoversJoin);
+ context.addToDontApplySet(this, rightCoversJoin);
+ context.addToDontApplySet(this, leftCoversJoin);
+ context.addToDontApplySet(this, rightEndsJoin);
+
+ context.addToDontApplySet(this, union1);
+ context.addToDontApplySet(this, union2);
+ context.addToDontApplySet(this, union3);
+ context.addToDontApplySet(this, union4);
return true;
}
@@ -274,7 +319,7 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
return null;
}
- // Check whether the function is a function we want to push.
+ // Check whether the function is a function we want to alter.
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
if (!intervalJoinConditions.contains(funcExpr.getFunctionIdentifier())) {
return null;
@@ -286,33 +331,26 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
return null;
}
- private void connectOperators(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to,
+ private void connectOperators(Mutable<ILogicalOperator> child, Mutable<ILogicalOperator> parent,
IOptimizationContext context) throws AlgebricksException {
- if (to.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
- ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode());
+ if (parent.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
+ ILogicalOperator eo = getExchangeOperator(child.getValue().getExecutionMode());
Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
- eo.getInputs().add(to);
- from.getValue().getInputs().add(eoRef);
- context.computeAndSetTypeEnvironmentForOperator(eo);
- context.computeAndSetTypeEnvironmentForOperator(from.getValue());
- } else {
- from.getValue().getInputs().add(to);
- context.computeAndSetTypeEnvironmentForOperator(from.getValue());
- }
- }
-
- private void updateConnections(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to,
- IOptimizationContext context, int index) throws AlgebricksException {
- if (from.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
- ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode());
- Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
- eo.getInputs().add(to);
- from.getValue().getInputs().set(index, eoRef);
- context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+ eo.getInputs().add(parent);
+ if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ ReplicateOperator ro = (ReplicateOperator) parent.getValue();
+ ro.getOutputs().add(eoRef);
+ }
+ child.getValue().getInputs().add(eoRef);
context.computeAndSetTypeEnvironmentForOperator(eo);
+ context.computeAndSetTypeEnvironmentForOperator(child.getValue());
} else {
- from.getValue().getInputs().set(index, to);
- context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+ if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ ReplicateOperator ro = (ReplicateOperator) parent.getValue();
+ ro.getOutputs().add(child);
+ }
+ child.getValue().getInputs().add(parent);
+ context.computeAndSetTypeEnvironmentForOperator(child.getValue());
}
}
@@ -323,25 +361,28 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
return eo;
}
- private ILogicalOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) {
+ private ReplicateOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) {
List<LogicalVariable> joinKeyLogicalVars = new ArrayList<>();
joinKeyLogicalVars.add(key);
//create the logical and physical operator
- IntervalLocalRangeSplitterOperator splitOperator = new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars);
+ boolean[] flags = new boolean[2];
+ for (int i = 0; i < flags.length; ++i) {
+ flags[i] = true;
+ }
+ ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags);
+ // ReplicatePOperator splitPOperator = new ReplicatePOperator();
IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars,
rangeMap);
splitOperator.setPhysicalOperator(splitPOperator);
splitOperator.setExecutionMode(mode);
-
- //create ExtensionOperator and put the commitOperator in it.
- ExtensionOperator extensionOperator = new ExtensionOperator(splitOperator);
- extensionOperator.setPhysicalOperator(splitPOperator);
- extensionOperator.setExecutionMode(mode);
- return extensionOperator;
+ return splitOperator;
}
private ReplicateOperator getReplicateOperator(int outputArity, ExecutionMode mode) {
boolean[] flags = new boolean[outputArity];
+ for (int i = 0; i < flags.length; ++i) {
+ flags[i] = true;
+ }
ReplicateOperator ro = new ReplicateOperator(flags.length, flags);
ReplicatePOperator rpo = new ReplicatePOperator();
ro.setPhysicalOperator(rpo);
@@ -349,6 +390,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
return ro;
}
+ private ILogicalOperator getMaterializeOperator(ExecutionMode mode) {
+ MaterializeOperator mo = new MaterializeOperator();
+ MaterializePOperator mpo = new MaterializePOperator(false);
+ mo.setPhysicalOperator(mpo);
+ mo.setExecutionMode(mode);
+ return mo;
+ }
+
private ILogicalOperator getNestedLoop(Mutable<ILogicalExpression> condition, IOptimizationContext context,
ExecutionMode mode) {
int memoryJoinSize = context.getPhysicalOptimizationConfig().getMaxFramesForJoin();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
index aad0cf3..799a6af 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator {
+ @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 818e1ec..083e4d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -35,9 +35,6 @@ import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
public class OneToOneExchangePOperator extends AbstractExchangePOperator {
- public OneToOneExchangePOperator() {
- }
-
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE;
@@ -59,7 +56,7 @@ public class OneToOneExchangePOperator extends AbstractExchangePOperator {
public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.SAME_COUNT);
+ return new Pair<>(conn, TargetConstraint.SAME_COUNT);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index af40250..a1c6164 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -49,19 +49,19 @@ public class JobBuilder implements IHyracksJobBuilder {
private final AlgebricksAbsolutePartitionConstraint clusterLocations;
private final AlgebricksAbsolutePartitionConstraint countOneLocation;
- private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
- private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
- private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>>();
+ private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<>();
+ private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<>();
+ private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<>();
- private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>>();
- private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<IPushRuntimeFactory, ILogicalOperator>();
- private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<ILogicalOperator, IOperatorDescriptor>();
- private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<ILogicalOperator, AlgebricksPartitionConstraint>();
+ private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<>();
+ private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<>();
+ private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<>();
+ private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<>();
- private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<ILogicalOperator, Integer>();
- private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>>();
- private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<Integer, AlgebricksMetaOperatorDescriptor>();
- private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<IOperatorDescriptor, AlgebricksPartitionConstraint>();
+ private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<>();
+ private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<>();
+ private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<>();
+ private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<>();
private int aodCounter = 0;
@@ -123,14 +123,14 @@ public class JobBuilder implements IHyracksJobBuilder {
int destInputIndex) {
ArrayList<ILogicalOperator> outputs = outEdges.get(src);
if (outputs == null) {
- outputs = new ArrayList<ILogicalOperator>();
+ outputs = new ArrayList<>();
outEdges.put(src, outputs);
}
addAtPos(outputs, dest, srcOutputIndex);
ArrayList<ILogicalOperator> inp = inEdges.get(dest);
if (inp == null) {
- inp = new ArrayList<ILogicalOperator>();
+ inp = new ArrayList<>();
inEdges.put(dest, inp);
}
addAtPos(inp, src, destInputIndex);
@@ -270,7 +270,7 @@ public class JobBuilder implements IHyracksJobBuilder {
}
private Map<IConnectorDescriptor, TargetConstraint> setupConnectors() throws AlgebricksException {
- Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<IConnectorDescriptor, TargetConstraint>();
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<>();
for (ILogicalOperator exchg : connectors.keySet()) {
ILogicalOperator inOp = inEdges.get(exchg).get(0);
ILogicalOperator outOp = outEdges.get(exchg).get(0);