You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/30 12:28:22 UTC

asterixdb git commit: [ASTERIXDB-2263][RT] Use Plan Stages To Estimate Resources

Repository: asterixdb
Updated Branches:
  refs/heads/master 5aa29b86f -> b2abe1e90


[ASTERIXDB-2263][RT] Use Plan Stages To Estimate Resources

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Introduce PlanStagesGenerator that generates
  plan stages using blocking/two-phased operators.
- Introduce OperatorResourcesComputer that
  calculates the estimated resources for any
  logical operator.
- Estimate jobs required resources based on
  the stage that requires most resources in
  the plan.

Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2299
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2abe1e9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2abe1e9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2abe1e9

Branch: refs/heads/master
Commit: b2abe1e90ee87d49265d696562f91aab6356315a
Parents: 5aa29b8
Author: Murtadha Hubail <mh...@apache.org>
Authored: Tue Jan 30 02:55:08 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Jan 30 04:27:45 2018 -0800

----------------------------------------------------------------------
 .../app/resource/OperatorResourcesComputer.java | 120 ++++++
 .../apache/asterix/app/resource/PlanStage.java  |  52 +++
 .../app/resource/PlanStagesGenerator.java       | 427 +++++++++++++++++++
 .../org/apache/asterix/utils/ResourceUtils.java |  36 +-
 .../app/resource/PlanStagesGeneratorTest.java   | 307 +++++++++++++
 .../logical/AbstractReplicateOperator.java      |   8 +
 6 files changed, 941 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
new file mode 100644
index 0000000..7eaaa3d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+
+public class OperatorResourcesComputer {
+
+    public static final int MIN_OPERATOR_CORES = 1;
+    private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+    private final int numComputationPartitions;
+    private final long groupByMemorySize;
+    private final long joinMemorySize;
+    private final long sortMemorySize;
+    private final long frameSize;
+
+    public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
+            int joinFrameLimit, long frameSize) {
+        this.numComputationPartitions = numComputationPartitions;
+        this.groupByMemorySize = groupFrameLimit * frameSize;
+        this.joinMemorySize = joinFrameLimit * frameSize;
+        this.sortMemorySize = sortFrameLimit * frameSize;
+        this.frameSize = frameSize;
+    }
+
+    public int getOperatorRequiredCores(ILogicalOperator operator) {
+        if (operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            return numComputationPartitions;
+        }
+        return MIN_OPERATOR_CORES;
+    }
+
+    public long getOperatorRequiredMemory(ILogicalOperator operator) {
+        switch (operator.getOperatorTag()) {
+            case AGGREGATE:
+            case ASSIGN:
+            case DATASOURCESCAN:
+            case DISTINCT:
+            case DISTRIBUTE_RESULT:
+            case EMPTYTUPLESOURCE:
+            case DELEGATE_OPERATOR:
+            case EXTERNAL_LOOKUP:
+            case LEFT_OUTER_UNNEST_MAP:
+            case LIMIT:
+            case MATERIALIZE:
+            case NESTEDTUPLESOURCE:
+            case PROJECT:
+            case REPLICATE:
+            case RUNNINGAGGREGATE:
+            case SCRIPT:
+            case SELECT:
+            case SINK:
+            case SPLIT:
+            case SUBPLAN:
+            case TOKENIZE:
+            case UNIONALL:
+            case UNNEST:
+            case LEFT_OUTER_UNNEST:
+            case UNNEST_MAP:
+            case UPDATE:
+            case WRITE:
+            case WRITE_RESULT:
+            case INDEX_INSERT_DELETE_UPSERT:
+            case INSERT_DELETE_UPSERT:
+            case INTERSECT:
+                return getOperatorRequiredMemory(operator, frameSize);
+            case EXCHANGE:
+                return getExchangeRequiredMemory((ExchangeOperator) operator);
+            case GROUP:
+                return getOperatorRequiredMemory(operator, groupByMemorySize);
+            case ORDER:
+                return getOperatorRequiredMemory(operator, sortMemorySize);
+            case INNERJOIN:
+            case LEFTOUTERJOIN:
+                return getOperatorRequiredMemory(operator, joinMemorySize);
+            default:
+                throw new IllegalStateException("Unrecognized operator: " + operator.getOperatorTag());
+        }
+    }
+
+    private long getOperatorRequiredMemory(ILogicalOperator op, long memorySize) {
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            return memorySize * numComputationPartitions;
+        }
+        return memorySize;
+    }
+
+    private long getExchangeRequiredMemory(ExchangeOperator op) {
+        final IPhysicalOperator physicalOperator = op.getPhysicalOperator();
+        final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
+        if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
+                || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+            return getOperatorRequiredMemory(op, frameSize);
+        }
+        return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java
new file mode 100644
index 0000000..1e623c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+public class PlanStage {
+
+    private final Set<ILogicalOperator> operators = new HashSet<>();
+    private final int stageId;
+
+    PlanStage(int stageId) {
+        this.stageId = stageId;
+    }
+
+    @Override
+    public String toString() {
+        return "Stage{" + "stageId=" + stageId + ", operators(" + operators.size() + ")" + "=" + operators + '}';
+    }
+
+    public Set<ILogicalOperator> getOperators() {
+        return operators;
+    }
+
+    public long getRequiredMemory(OperatorResourcesComputer resourcesComputer) {
+        return operators.stream().mapToLong(resourcesComputer::getOperatorRequiredMemory).sum();
+    }
+
+    public int getRequiredCores(OperatorResourcesComputer resourcesComputer) {
+        return operators.stream().mapToInt(resourcesComputer::getOperatorRequiredCores).max()
+                .orElse(OperatorResourcesComputer.MIN_OPERATOR_CORES);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
new file mode 100644
index 0000000..8b32375
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class PlanStagesGenerator implements ILogicalOperatorVisitor<Void, Void> {
+
+    private static final int JOIN_FIRST_INPUT = 1;
+    private static final int JOIN_SECOND_INPUT = 2;
+    private final Set<ILogicalOperator> visitedOperators = new HashSet<>();
+    private final LinkedList<ILogicalOperator> pendingBlockingOperators = new LinkedList<>();
+    private final List<PlanStage> stages = new ArrayList<>();
+    private PlanStage currentStage;
+    private int stageCounter;
+
+    public PlanStagesGenerator() {
+        currentStage = new PlanStage(++stageCounter);
+        stages.add(currentStage);
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        // Makes sure that the downstream of a replicate operator is only visited once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            visit(op);
+        } else {
+            merge(op);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        // Makes sure that the downstream of a split operator is only visited once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            visit(op);
+        } else {
+            merge(op);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        visit(op);
+        return null;
+    }
+
+    public List<PlanStage> getStages() {
+        return stages;
+    }
+
+    private void visit(ILogicalOperator op) throws AlgebricksException {
+        addToStage(op);
+        if (!pendingBlockingOperators.isEmpty()) {
+            final ILogicalOperator firstPending = pendingBlockingOperators.pop();
+            visitBlocking(firstPending);
+        }
+    }
+
+    private void visitBlocking(ILogicalOperator blockingOp) throws AlgebricksException {
+        final PlanStage blockingOpStage = new PlanStage(++stageCounter);
+        blockingOpStage.getOperators().add(blockingOp);
+        stages.add(blockingOpStage);
+        currentStage = blockingOpStage;
+        switch (blockingOp.getOperatorTag()) {
+            case INNERJOIN:
+            case LEFTOUTERJOIN:
+                // visit only the second input
+                ILogicalOperator joinSecondInput = getJoinOperatorInput(blockingOp, JOIN_SECOND_INPUT);
+                joinSecondInput.accept(this, null);
+                break;
+            case GROUP:
+            case ORDER:
+                visitInputs(blockingOp);
+                break;
+            default:
+                throw new IllegalStateException("Unrecognized blocking operator: " + blockingOp.getOperatorTag());
+        }
+    }
+
+    private void addToStage(ILogicalOperator op) throws AlgebricksException {
+        currentStage.getOperators().add(op);
+        switch (op.getOperatorTag()) {
+            case INNERJOIN:
+            case LEFTOUTERJOIN:
+                pendingBlockingOperators.add(op);
+                // continue on the same stage
+                final ILogicalOperator joinFirstInput = getJoinOperatorInput(op, JOIN_FIRST_INPUT);
+                joinFirstInput.accept(this, null);
+                break;
+            case GROUP:
+                if (isBlockingGroupBy((GroupByOperator) op)) {
+                    pendingBlockingOperators.add(op);
+                    return;
+                }
+                // continue on the same stage
+                visitInputs(op);
+                break;
+            case ORDER:
+                pendingBlockingOperators.add(op);
+                break;
+            default:
+                visitInputs(op);
+                break;
+        }
+    }
+
+    private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+        if (isMaterialized(op)) {
+            // don't visit the inputs of this operator since it is supposed to be blocking due to materialization.
+            // some other non-blocking operator will visit those inputs when reached.
+            return;
+        }
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            inputOpRef.getValue().accept(this, null);
+        }
+    }
+
+    private boolean isBlockingGroupBy(GroupByOperator op) {
+        return op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY
+                || op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY;
+    }
+
+    /**
+     * Checks whether the operator {@code op} is supposed to be materialized
+     * due to a replicate/split operators.
+     *
+     * @param op
+     * @return true if the operator will be materialized. Otherwise false
+     */
+    private boolean isMaterialized(ILogicalOperator op) {
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            final ILogicalOperator inputOp = inputOpRef.getValue();
+            final LogicalOperatorTag inputOpTag = inputOp.getOperatorTag();
+            if (inputOpTag == LogicalOperatorTag.REPLICATE || inputOpTag == LogicalOperatorTag.SPLIT) {
+                final AbstractReplicateOperator replicateOperator = (AbstractReplicateOperator) inputOp;
+                if (replicateOperator.isMaterialized(op)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private ILogicalOperator getJoinOperatorInput(ILogicalOperator op, int inputNum) {
+        if (inputNum != JOIN_FIRST_INPUT && inputNum != JOIN_SECOND_INPUT) {
+            throw new IllegalArgumentException("invalid input number for join operator");
+        }
+        final List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+        if (inputs.size() != 2) {
+            throw new IllegalStateException("Join must have exactly two inputs. Current inputs: " + inputs.size());
+        }
+        return op.getInputs().get(inputNum - 1).getValue();
+    }
+
+    /**
+     * Merges all operators on the current stage to the stage on which {@code op} appeared.
+     *
+     * @param op
+     */
+    private void merge(ILogicalOperator op) {
+        // all operators in this stage belong to the stage of the already visited op
+        for (PlanStage stage : stages) {
+            if (stage != currentStage && stage.getOperators().contains(op)) {
+                stage.getOperators().addAll(currentStage.getOperators());
+                stages.remove(currentStage);
+                currentStage = stage;
+                break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index ccda1e7..0149ffa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -19,7 +19,11 @@
 
 package org.apache.asterix.utils;
 
-import org.apache.asterix.app.resource.RequiredCapacityVisitor;
+import java.util.List;
+
+import org.apache.asterix.app.resource.OperatorResourcesComputer;
+import org.apache.asterix.app.resource.PlanStage;
+import org.apache.asterix.app.resource.PlanStagesGenerator;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -54,16 +58,30 @@ public class ResourceUtils {
         final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
         final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
         final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
+        final List<PlanStage> planStages = getStages(plan);
+        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
+                groupFrameLimit, joinFrameLimit, frameSize);
+    }
 
-        // Creates a cluster capacity visitor.
-        IClusterCapacity clusterCapacity = new ClusterCapacity();
-        RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
-                sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity);
-
+    public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
         // There could be only one root operator for a top-level query plan.
-        ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
-        rootOp.accept(visitor, null);
-        return clusterCapacity;
+        final ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
+        final PlanStagesGenerator stagesGenerator = new PlanStagesGenerator();
+        rootOp.accept(stagesGenerator, null);
+        return stagesGenerator.getStages();
     }
 
+    public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
+            int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int frameSize) {
+        final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
+                groupFrameLimit, joinFrameLimit, frameSize);
+        final IClusterCapacity clusterCapacity = new ClusterCapacity();
+        final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
+                .orElseThrow(IllegalStateException::new);
+        clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory);
+        final Integer maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
+                .orElseThrow(IllegalStateException::new);
+        clusterCapacity.setAggregatedCores(maxRequireCores);
+        return clusterCapacity;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
new file mode 100644
index 0000000..0e55b1e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.GROUP;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.INNERJOIN;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.LEFTOUTERJOIN;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.ORDER;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.LOCAL;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.PARTITIONED;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import org.apache.asterix.utils.ResourceUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlanStagesGeneratorTest {
+
+    private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS =
+            new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER));
+    private static final long MEMORY_BUDGET = 33554432L;
+    private static final int FRAME_SIZE = 32768;
+    private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
+    private static final int PARALLELISM = 10;
+    private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+    @Test
+    public void noBlockingPlan() throws AlgebricksException {
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        ets.setExecutionMode(UNPARTITIONED);
+
+        AssignOperator assignOperator = new AssignOperator(Collections.emptyList(), null);
+        assignOperator.setExecutionMode(UNPARTITIONED);
+        assignOperator.getInputs().add(new MutableObject<>(ets));
+
+        ExchangeOperator exchange = new ExchangeOperator();
+        exchange.setExecutionMode(UNPARTITIONED);
+        exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+        exchange.getInputs().add(new MutableObject<>(assignOperator));
+
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        resultOperator.setExecutionMode(UNPARTITIONED);
+        resultOperator.getInputs().add(new MutableObject<>(exchange));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+        List<PlanStage> stages = ResourceUtils.getStages(plan);
+        // ensure a single stage plan
+        final int expectedStages = 1;
+        Assert.assertEquals(expectedStages, stages.size());
+        validateStages(stages, resultOperator, exchange, ets, assignOperator);
+        // frame size for every operator
+        final long expectedMemory = stages.get(0).getOperators().size() * FRAME_SIZE;
+        assertRequiredMemory(stages, expectedMemory);
+    }
+
+    @Test
+    public void testNonBlockingGroupByOrderBy() throws AlgebricksException {
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        ets.setExecutionMode(PARTITIONED);
+
+        DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
+        scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.getInputs().add(new MutableObject<>(ets));
+
+        ExchangeOperator exchange = new ExchangeOperator();
+        exchange.setExecutionMode(PARTITIONED);
+        exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+        exchange.getInputs().add(new MutableObject<>(scanOperator));
+
+        GroupByOperator groupByOperator = new GroupByOperator();
+        groupByOperator.setExecutionMode(PARTITIONED);
+        groupByOperator
+                .setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT));
+        groupByOperator.getInputs().add(new MutableObject<>(exchange));
+
+        OrderOperator orderOperator = new OrderOperator();
+        orderOperator.setExecutionMode(PARTITIONED);
+        orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
+
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.getInputs().add(new MutableObject<>(orderOperator));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+        final List<PlanStage> stages = ResourceUtils.getStages(plan);
+        validateStages(stages, ets, exchange, groupByOperator, orderOperator, resultOperator);
+        // ensure 3 stage (root to order, order to group by, group by to ets)
+        final int expectedStages = 2;
+        Assert.assertEquals(expectedStages, stages.size());
+
+        // dominating stage should have orderBy, orderBy's input (groupby), groupby's input (exchange),
+        // exchange's input (scanOperator), and scanOperator's input (ets)
+        long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE;
+        long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE;
+        long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
+
+        final long expectedMemory = orderOperatorRequiredMemory + groupByOperatorRequiredMemory + exchangeRequiredMemory
+                + scanOperatorRequiredMemory + etsRequiredMemory;
+        assertRequiredMemory(stages, expectedMemory);
+    }
+
+    @Test
+    public void testJoinGroupby() throws AlgebricksException {
+        EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator();
+        ets1.setExecutionMode(PARTITIONED);
+
+        DataSourceScanOperator scanOperator1 = new DataSourceScanOperator(Collections.emptyList(), null);
+        scanOperator1.setExecutionMode(PARTITIONED);
+        scanOperator1.getInputs().add(new MutableObject<>(ets1));
+
+        EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator();
+        ets1.setExecutionMode(PARTITIONED);
+
+        DataSourceScanOperator scanOperator2 = new DataSourceScanOperator(Collections.emptyList(), null);
+        scanOperator2.setExecutionMode(PARTITIONED);
+        scanOperator2.getInputs().add(new MutableObject<>(ets2));
+
+        InnerJoinOperator firstJoin = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+        firstJoin.setExecutionMode(PARTITIONED);
+        firstJoin.getInputs().add(new MutableObject<>(scanOperator1));
+        firstJoin.getInputs().add(new MutableObject<>(scanOperator2));
+
+        ExchangeOperator exchangeOperator1 = new ExchangeOperator();
+        exchangeOperator1.setExecutionMode(PARTITIONED);
+        exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
+
+        EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
+        ets1.setExecutionMode(PARTITIONED);
+
+        GroupByOperator groupByOperator = new GroupByOperator();
+        groupByOperator
+                .setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT));
+        groupByOperator.setExecutionMode(LOCAL);
+        groupByOperator.getInputs().add(new MutableObject<>(ets3));
+
+        ExchangeOperator exchangeOperator2 = new ExchangeOperator();
+        exchangeOperator2.setExecutionMode(PARTITIONED);
+        exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator));
+
+        LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+        secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
+        secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
+
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.getInputs().add(new MutableObject<>(secondJoin));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+        List<PlanStage> stages = ResourceUtils.getStages(plan);
+        final int expectedStages = 4;
+        Assert.assertEquals(expectedStages, stages.size());
+        validateStages(stages, ets1, scanOperator1, ets2, scanOperator2, firstJoin, exchangeOperator1, ets3,
+                groupByOperator, exchangeOperator2, secondJoin, resultOperator);
+
+        // dominating stage should have the following operators:
+        // resultOperator, its input (secondJoin), secondJoin's first input (exchangeOperator1), exchangeOperator1's
+        // input (firstJoin), firstJoin's first input (scanOperator1), and scanOperator1's input (ets1)
+        long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
+        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * PARALLELISM * PARALLELISM * FRAME_SIZE;
+        long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
+        long ets1RequiredMemory = FRAME_SIZE * PARALLELISM;
+
+        long expectedMemory = resultOperatorRequiredMemory + secondJoinRequiredMemory + exchangeOperator1RequiredMemory
+                + firstJoinRequiredMemory + scanOperator1RequiredMemory + ets1RequiredMemory;
+        assertRequiredMemory(stages, expectedMemory);
+    }
+
+    @Test
+    public void testReplicateSortJoin() throws AlgebricksException {
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        ets.setExecutionMode(PARTITIONED);
+
+        DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
+        scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.getInputs().add(new MutableObject<>(ets));
+
+        ReplicateOperator replicateOperator = new ReplicateOperator(2);
+        replicateOperator.setExecutionMode(PARTITIONED);
+        replicateOperator.getInputs().add(new MutableObject<>(scanOperator));
+
+        OrderOperator order1 = new OrderOperator();
+        order1.setExecutionMode(PARTITIONED);
+        order1.setPhysicalOperator(new OneToOneExchangePOperator());
+        order1.getInputs().add(new MutableObject<>(replicateOperator));
+
+        OrderOperator order2 = new OrderOperator();
+        order2.setExecutionMode(PARTITIONED);
+        order2.setPhysicalOperator(new OneToOneExchangePOperator());
+        order2.getInputs().add(new MutableObject<>(replicateOperator));
+
+        LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+        secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.getInputs().add(new MutableObject<>(order1));
+        secondJoin.getInputs().add(new MutableObject<>(order2));
+
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.getInputs().add(new MutableObject<>(secondJoin));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+        List<PlanStage> stages = ResourceUtils.getStages(plan);
+        final int expectedStages = 3;
+        Assert.assertEquals(expectedStages, stages.size());
+        validateStages(stages);
+
+        // dominating stage should have the following operators:
+        // secondJoin, secondJoin's second input (order2), order2's input (replicate),
+        // replicate's input (scanOperator), scanOperator's input (ets)
+        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
+        long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
+        long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
+        long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
+                + scanOperator1RequiredMemory + etsRequiredMemory;
+        assertRequiredMemory(stages, expectedMemory);
+    }
+
+    private void validateStages(List<PlanStage> stages, ILogicalOperator... operators) {
+        // ensure all operators appear
+        Stream.of(operators).forEach(op -> ensureOperatorExists(stages, op));
+        // ensure the correct count
+        for (PlanStage stage : stages) {
+            stage.getOperators().forEach(op -> validateOperatorStages(stages, op));
+        }
+    }
+
+    private void ensureOperatorExists(List<PlanStage> stages, ILogicalOperator operator) {
+        final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count();
+        Assert.assertTrue(actual > 0);
+    }
+
+    private void validateOperatorStages(List<PlanStage> stages, ILogicalOperator operator) {
+        if (stages.size() == 1) {
+            return;
+        }
+        long expectedAppearances = BLOCKING_OPERATORS.contains(operator.getOperatorTag()) ? 2 : 1;
+        if (operator.getOperatorTag() == GROUP) {
+            GroupByOperator groupByOperator = (GroupByOperator) operator;
+            if (groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY
+                    || groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY) {
+                expectedAppearances = 2;
+            }
+        }
+        final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count();
+        Assert.assertEquals(expectedAppearances, actual);
+    }
+
+    private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
+        final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
+                FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+        Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
index 852c392..3bb0f47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -109,4 +109,12 @@ public abstract class AbstractReplicateOperator extends AbstractLogicalOperator
         return false;
     }
 
+    public boolean isMaterialized(ILogicalOperator op) {
+        for (int i = 0; i < outputs.size(); i++) {
+            if (outputs.get(i).getValue() == op) {
+                return outputMaterializationFlags[i];
+            }
+        }
+        return false;
+    }
 }