You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:54:54 UTC

[04/50] [abbrv] asterixdb git commit: working mega overlapping plan.

working mega overlapping plan.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0f533e8b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0f533e8b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0f533e8b

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 0f533e8bbbac7cf633d87013d71370114970c959
Parents: aea7fe8
Author: Preston Carman <pr...@apache.org>
Authored: Tue Jul 5 17:56:26 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Jul 5 17:56:26 2016 -0700

----------------------------------------------------------------------
 .../IntervalLocalRangeSplitterOperator.java     |   2 +-
 .../IntervalLocalRangeOperatorDescriptor.java   |  13 +-
 .../IntervalLocalRangeSplitterPOperator.java    |  38 ++--
 .../asterix/optimizer/base/RuleCollections.java |   1 +
 .../rules/IntervalSplitPartitioningRule.java    | 201 ++++++++++++-------
 .../physical/AbstractExchangePOperator.java     |   1 +
 .../physical/OneToOneExchangePOperator.java     |   5 +-
 .../algebricks/core/jobgen/impl/JobBuilder.java |  28 +--
 8 files changed, 173 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
index 9ae9f7d..181e0fa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
@@ -54,7 +54,7 @@ public class IntervalLocalRangeSplitterOperator extends AbstractExtensibleLogica
 
     @Override
     public String toString() {
-        return "IntervalLocalRangeSplitterOperator";
+        return "IntervalLocalRangeSplitterOperator " + joinKeyLogicalVars;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index 392bf43..ca44f78 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -45,20 +45,21 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
     private static final long serialVersionUID = 1L;
     private static final int PARTITION_ACTIVITY_ID = 0;
 
+    private static final int OUTPUT_ARITY = 3;
+
     private static final int INPUT_STARTS = 0;
     private static final int INPUT_COVERS = 2;
     private static final int INPUT_ENDS = 1;
 
-//    private static final int INPUT_STARTS = 0;
-//    private static final int INPUT_COVERS = 0;
-//    private static final int INPUT_ENDS = 0;
-
     private final int key;
     private final IRangeMap rangeMap;
 
     public IntervalLocalRangeOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys,
             RecordDescriptor recordDescriptor, IRangeMap rangeMap) {
-        super(spec, 1, 3);
+        super(spec, 1, OUTPUT_ARITY);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = recordDescriptor;
+        }
         key = keys[0];
         this.rangeMap = rangeMap;
     }
@@ -105,7 +106,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
                 @Override
                 public void flush() throws HyracksDataException {
                     for (int i = 0; i < getOutputArity(); i++) {
-                        resultAppender[i].flush(writers[i]);
+                        FrameUtils.flushFrame(resultAppender[i].getBuffer(), writers[i]);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
index 1150b91..ced06aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
@@ -21,6 +21,7 @@ package org.apache.asterix.algebra.operators.physical;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -28,6 +29,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -55,9 +57,13 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    public String toString() {
+        return "IntervalLocalRangeSplitterPOperator " + intervalFields;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
     }
 
     @Override
@@ -67,35 +73,37 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato
     }
 
     @Override
-    public String toString() {
-        return "IntervalLocalRangeSplitterPOperator " + intervalFields;
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
     }
 
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]);
-
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
-                context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
 
-        IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recordDescriptor, rangeMap);
+        int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]);
+
+        IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recDescriptor, rangeMap);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-        // and contribute one edge from its child
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
 
     @Override
-    public boolean isMicroOperator() {
-        return false;
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        ReplicateOperator rop = (ReplicateOperator) op;
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels = new int[rop.getOutputArity()]; // filled with 0's
+        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
     }
 
     @Override
     public boolean expensiveThanMaterialization() {
         return false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 5fcfc94..9a3a125 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -342,4 +342,5 @@ public final class RuleCollections {
         prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
         return prepareForJobGenRewrites;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index a6f49f9..2772e68 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -51,12 +51,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogi
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
@@ -122,17 +124,19 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         ILogicalOperator op = opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
         if (!isIntervalJoin(op)) {
             return false;
         }
-        InnerJoinOperator startsJoin = (InnerJoinOperator) op;
-        ExecutionMode mode = startsJoin.getExecutionMode();
-        Mutable<ILogicalOperator> startsJoinRef = opRef;
+        InnerJoinOperator originalIntervalJoin = (InnerJoinOperator) op;
+        ExecutionMode mode = originalIntervalJoin.getExecutionMode();
         Set<LogicalVariable> localLiveVars = new ListSet<>();
-        VariableUtilities.getLiveVariables(op, localLiveVars);
+        VariableUtilities.getLiveVariables(originalIntervalJoin, localLiveVars);
 
-        Mutable<ILogicalOperator> leftSortedInput = op.getInputs().get(0);
-        Mutable<ILogicalOperator> rightSortedInput = op.getInputs().get(1);
+        Mutable<ILogicalOperator> leftSortedInput = originalIntervalJoin.getInputs().get(0);
+        Mutable<ILogicalOperator> rightSortedInput = originalIntervalJoin.getInputs().get(1);
         if (leftSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE
                 && rightSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
             return false;
@@ -160,14 +164,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         // TODO check physical join
 
         // Interval local partition operators
-        LogicalVariable leftJoinKey = getJoinKey(startsJoin.getCondition().getValue(), LEFT);
-        LogicalVariable rightJoinKey = getJoinKey(startsJoin.getCondition().getValue(), RIGHT);
+        LogicalVariable leftJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), LEFT);
+        LogicalVariable rightJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), RIGHT);
         if (leftJoinKey == null || rightJoinKey == null) {
             return false;
         }
-        ILogicalOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, rightRangeMap, mode);
+        ReplicateOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, leftRangeMap, mode);
         Mutable<ILogicalOperator> leftIntervalSplitRef = new MutableObject<>(leftIntervalSplit);
-        ILogicalOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode);
+        ReplicateOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode);
         Mutable<ILogicalOperator> rightIntervalSplitRef = new MutableObject<>(rightIntervalSplit);
 
         // Replicate operators
@@ -177,20 +181,36 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         Mutable<ILogicalOperator> rightStartsSplitRef = new MutableObject<>(rightStartsSplit);
 
         // Covers Join Operator
-        ILogicalOperator leftCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode);
+        ILogicalOperator leftCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode);
         Mutable<ILogicalOperator> leftCoversJoinRef = new MutableObject<>(leftCoversJoin);
-        ILogicalOperator rightCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode);
+        ILogicalOperator rightCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode);
         Mutable<ILogicalOperator> rightCoversJoinRef = new MutableObject<>(rightCoversJoin);
 
         // Ends Join Operator
-        ILogicalOperator leftEndsJoin = getIntervalJoin(startsJoin, context, mode);
-        ILogicalOperator rightEndsJoin = getIntervalJoin(startsJoin, context, mode);
-        if (leftEndsJoin == null || rightEndsJoin == null) {
+        ILogicalOperator startsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+        ILogicalOperator leftEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+        ILogicalOperator rightEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode);
+        if (startsJoin == null || leftEndsJoin == null || rightEndsJoin == null) {
             return false;
         }
+        Mutable<ILogicalOperator> startsJoinRef = new MutableObject<>(startsJoin);
         Mutable<ILogicalOperator> leftEndsJoinRef = new MutableObject<>(leftEndsJoin);
         Mutable<ILogicalOperator> rightEndsJoinRef = new MutableObject<>(rightEndsJoin);
 
+        // Materialize Operator
+        ILogicalOperator leftMaterialize0 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> leftMaterialize0Ref = new MutableObject<>(leftMaterialize0);
+        ILogicalOperator leftMaterialize1 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> leftMaterialize1Ref = new MutableObject<>(leftMaterialize1);
+        ILogicalOperator leftMaterialize2 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> leftMaterialize2Ref = new MutableObject<>(leftMaterialize2);
+        ILogicalOperator rightMaterialize0 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> rightMaterialize0Ref = new MutableObject<>(rightMaterialize0);
+        ILogicalOperator rightMaterialize1 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> rightMaterialize1Ref = new MutableObject<>(rightMaterialize1);
+        ILogicalOperator rightMaterialize2 = getMaterializeOperator(mode);
+        Mutable<ILogicalOperator> rightMaterialize2Ref = new MutableObject<>(rightMaterialize2);
+
         // Union All Operator
         ILogicalOperator union1 = getUnionOperator(localLiveVars, mode);
         Mutable<ILogicalOperator> union1Ref = new MutableObject<>(union1);
@@ -201,59 +221,84 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         ILogicalOperator union4 = getUnionOperator(localLiveVars, mode);
         Mutable<ILogicalOperator> union4Ref = new MutableObject<>(union4);
 
+        // Remove old path
+        originalIntervalJoin.getInputs().clear();
+
         // Connect main path
         connectOperators(leftIntervalSplitRef, leftSortedInput, context);
-        context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplit);
-        connectOperators(leftStartsSplitRef, leftIntervalSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(leftStartsSplit);
+        context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplitRef.getValue());
+        connectOperators(leftMaterialize0Ref, leftIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftMaterialize0Ref.getValue());
+        connectOperators(leftMaterialize1Ref, leftIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftMaterialize1Ref.getValue());
+        connectOperators(leftMaterialize2Ref, leftIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftMaterialize2Ref.getValue());
+
+        connectOperators(leftStartsSplitRef, leftMaterialize0Ref, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftStartsSplitRef.getValue());
+
         connectOperators(rightIntervalSplitRef, rightSortedInput, context);
-        context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplit);
-        connectOperators(rightStartsSplitRef, rightIntervalSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(rightStartsSplit);
-        updateConnections(startsJoinRef, leftStartsSplitRef, context, LEFT);
-        updateConnections(startsJoinRef, rightStartsSplitRef, context, RIGHT);
-        context.computeAndSetTypeEnvironmentForOperator(startsJoin);
-        leftStartsSplit.getOutputs().add(startsJoinRef);
-        rightStartsSplit.getOutputs().add(startsJoinRef);
+        context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplitRef.getValue());
+        connectOperators(rightMaterialize0Ref, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightMaterialize0Ref.getValue());
+        connectOperators(rightMaterialize1Ref, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightMaterialize1Ref.getValue());
+        connectOperators(rightMaterialize2Ref, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightMaterialize2Ref.getValue());
+
+        connectOperators(rightStartsSplitRef, rightMaterialize0Ref, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightStartsSplitRef.getValue());
+
+        // Connect left and right starts path
+        connectOperators(startsJoinRef, leftStartsSplitRef, context);
+        connectOperators(startsJoinRef, rightStartsSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(startsJoinRef.getValue());
 
         // Connect left ends path
-        connectOperators(leftEndsJoinRef, leftIntervalSplitRef, context);
+        connectOperators(leftEndsJoinRef, leftMaterialize1Ref, context);
         connectOperators(leftEndsJoinRef, rightStartsSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(leftEndsJoin);
-        connectOperators(union1Ref, leftEndsJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftEndsJoinRef.getValue());
         connectOperators(union1Ref, startsJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union1);
-        rightStartsSplit.getOutputs().add(leftEndsJoinRef);
+        connectOperators(union1Ref, leftEndsJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(union1Ref.getValue());
 
         // Connect left covers path
-        connectOperators(leftCoversJoinRef, leftIntervalSplitRef, context);
+        connectOperators(leftCoversJoinRef, leftMaterialize2Ref, context);
         connectOperators(leftCoversJoinRef, rightStartsSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(leftCoversJoin);
+        context.computeAndSetTypeEnvironmentForOperator(leftCoversJoinRef.getValue());
         connectOperators(union2Ref, union1Ref, context);
         connectOperators(union2Ref, leftCoversJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union2);
-        rightStartsSplit.getOutputs().add(leftCoversJoinRef);
+        context.computeAndSetTypeEnvironmentForOperator(union2Ref.getValue());
 
         // Connect right ends path
         connectOperators(rightEndsJoinRef, leftStartsSplitRef, context);
-        connectOperators(rightEndsJoinRef, rightIntervalSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(rightEndsJoin);
+        connectOperators(rightEndsJoinRef, rightMaterialize1Ref, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightEndsJoinRef.getValue());
         connectOperators(union3Ref, union2Ref, context);
         connectOperators(union3Ref, rightEndsJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union3);
-        leftStartsSplit.getOutputs().add(rightEndsJoinRef);
+        context.computeAndSetTypeEnvironmentForOperator(union3Ref.getValue());
 
         // Connect right covers path
         connectOperators(rightCoversJoinRef, leftStartsSplitRef, context);
-        connectOperators(rightCoversJoinRef, rightIntervalSplitRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(rightCoversJoin);
+        connectOperators(rightCoversJoinRef, rightMaterialize2Ref, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightCoversJoinRef.getValue());
         connectOperators(union4Ref, union3Ref, context);
         connectOperators(union4Ref, rightCoversJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union4);
-        leftStartsSplit.getOutputs().add(rightCoversJoinRef);
+        context.computeAndSetTypeEnvironmentForOperator(union4Ref.getValue());
 
         // Update context
-        opRef.setValue(union4);
+        opRef.setValue(union4Ref.getValue());
+
+        context.addToDontApplySet(this, startsJoin);
+        context.addToDontApplySet(this, leftCoversJoin);
+        context.addToDontApplySet(this, rightCoversJoin);
+        context.addToDontApplySet(this, leftCoversJoin);
+        context.addToDontApplySet(this, rightEndsJoin);
+
+        context.addToDontApplySet(this, union1);
+        context.addToDontApplySet(this, union2);
+        context.addToDontApplySet(this, union3);
+        context.addToDontApplySet(this, union4);
         return true;
     }
 
@@ -274,7 +319,7 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
             return null;
         }
-        // Check whether the function is a function we want to push.
+        // Check whether the function is a function we want to alter.
         AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
         if (!intervalJoinConditions.contains(funcExpr.getFunctionIdentifier())) {
             return null;
@@ -286,33 +331,26 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         return null;
     }
 
-    private void connectOperators(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to,
+    private void connectOperators(Mutable<ILogicalOperator> child, Mutable<ILogicalOperator> parent,
             IOptimizationContext context) throws AlgebricksException {
-        if (to.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
-            ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode());
+        if (parent.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
+            ILogicalOperator eo = getExchangeOperator(child.getValue().getExecutionMode());
             Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
-            eo.getInputs().add(to);
-            from.getValue().getInputs().add(eoRef);
-            context.computeAndSetTypeEnvironmentForOperator(eo);
-            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
-        } else {
-            from.getValue().getInputs().add(to);
-            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
-        }
-    }
-
-    private void updateConnections(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to,
-            IOptimizationContext context, int index) throws AlgebricksException {
-        if (from.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
-            ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode());
-            Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
-            eo.getInputs().add(to);
-            from.getValue().getInputs().set(index, eoRef);
-            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+            eo.getInputs().add(parent);
+            if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+                ReplicateOperator ro = (ReplicateOperator) parent.getValue();
+                ro.getOutputs().add(eoRef);
+            }
+            child.getValue().getInputs().add(eoRef);
             context.computeAndSetTypeEnvironmentForOperator(eo);
+            context.computeAndSetTypeEnvironmentForOperator(child.getValue());
         } else {
-            from.getValue().getInputs().set(index, to);
-            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+            if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+                ReplicateOperator ro = (ReplicateOperator) parent.getValue();
+                ro.getOutputs().add(child);
+            }
+            child.getValue().getInputs().add(parent);
+            context.computeAndSetTypeEnvironmentForOperator(child.getValue());
         }
     }
 
@@ -323,25 +361,28 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         return eo;
     }
 
-    private ILogicalOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) {
+    private ReplicateOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) {
         List<LogicalVariable> joinKeyLogicalVars = new ArrayList<>();
         joinKeyLogicalVars.add(key);
         //create the logical and physical operator
-        IntervalLocalRangeSplitterOperator splitOperator = new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars);
+        boolean[] flags = new boolean[2];
+        for (int i = 0; i < flags.length; ++i) {
+            flags[i] = true;
+        }
+        ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags);
+        //        ReplicatePOperator splitPOperator = new ReplicatePOperator();
         IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars,
                 rangeMap);
         splitOperator.setPhysicalOperator(splitPOperator);
         splitOperator.setExecutionMode(mode);
-
-        //create ExtensionOperator and put the commitOperator in it.
-        ExtensionOperator extensionOperator = new ExtensionOperator(splitOperator);
-        extensionOperator.setPhysicalOperator(splitPOperator);
-        extensionOperator.setExecutionMode(mode);
-        return extensionOperator;
+        return splitOperator;
     }
 
     private ReplicateOperator getReplicateOperator(int outputArity, ExecutionMode mode) {
         boolean[] flags = new boolean[outputArity];
+        for (int i = 0; i < flags.length; ++i) {
+            flags[i] = true;
+        }
         ReplicateOperator ro = new ReplicateOperator(flags.length, flags);
         ReplicatePOperator rpo = new ReplicatePOperator();
         ro.setPhysicalOperator(rpo);
@@ -349,6 +390,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
         return ro;
     }
 
+    private ILogicalOperator getMaterializeOperator(ExecutionMode mode) {
+        MaterializeOperator mo = new MaterializeOperator();
+        MaterializePOperator mpo = new MaterializePOperator(false);
+        mo.setPhysicalOperator(mpo);
+        mo.setExecutionMode(mode);
+        return mo;
+    }
+
     private ILogicalOperator getNestedLoop(Mutable<ILogicalExpression> condition, IOptimizationContext context,
             ExecutionMode mode) {
         int memoryJoinSize = context.getPhysicalOptimizationConfig().getMaxFramesForJoin();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
index aad0cf3..799a6af 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 
 public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator {
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 818e1ec..083e4d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -35,9 +35,6 @@ import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 
 public class OneToOneExchangePOperator extends AbstractExchangePOperator {
 
-    public OneToOneExchangePOperator() {
-    }
-
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE;
@@ -59,7 +56,7 @@ public class OneToOneExchangePOperator extends AbstractExchangePOperator {
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
         IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
-        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.SAME_COUNT);
+        return new Pair<>(conn, TargetConstraint.SAME_COUNT);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index af40250..a1c6164 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -49,19 +49,19 @@ public class JobBuilder implements IHyracksJobBuilder {
     private final AlgebricksAbsolutePartitionConstraint clusterLocations;
     private final AlgebricksAbsolutePartitionConstraint countOneLocation;
 
-    private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
-    private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
-    private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>>();
+    private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<>();
+    private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<>();
+    private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<>();
 
-    private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>>();
-    private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<IPushRuntimeFactory, ILogicalOperator>();
-    private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<ILogicalOperator, IOperatorDescriptor>();
-    private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<ILogicalOperator, AlgebricksPartitionConstraint>();
+    private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<>();
+    private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<>();
+    private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<>();
+    private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<>();
 
-    private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<ILogicalOperator, Integer>();
-    private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>>();
-    private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<Integer, AlgebricksMetaOperatorDescriptor>();
-    private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<IOperatorDescriptor, AlgebricksPartitionConstraint>();
+    private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<>();
+    private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<>();
+    private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<>();
+    private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<>();
 
     private int aodCounter = 0;
 
@@ -123,14 +123,14 @@ public class JobBuilder implements IHyracksJobBuilder {
             int destInputIndex) {
         ArrayList<ILogicalOperator> outputs = outEdges.get(src);
         if (outputs == null) {
-            outputs = new ArrayList<ILogicalOperator>();
+            outputs = new ArrayList<>();
             outEdges.put(src, outputs);
         }
         addAtPos(outputs, dest, srcOutputIndex);
 
         ArrayList<ILogicalOperator> inp = inEdges.get(dest);
         if (inp == null) {
-            inp = new ArrayList<ILogicalOperator>();
+            inp = new ArrayList<>();
             inEdges.put(dest, inp);
         }
         addAtPos(inp, src, destInputIndex);
@@ -270,7 +270,7 @@ public class JobBuilder implements IHyracksJobBuilder {
     }
 
     private Map<IConnectorDescriptor, TargetConstraint> setupConnectors() throws AlgebricksException {
-        Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<IConnectorDescriptor, TargetConstraint>();
+        Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<>();
         for (ILogicalOperator exchg : connectors.keySet()) {
             ILogicalOperator inOp = inEdges.get(exchg).get(0);
             ILogicalOperator outOp = outEdges.get(exchg).get(0);