You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:33 UTC

[03/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR] Parallel Sort Optimization

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 07ff0ab..d879d36 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.algebricks.core.rewriter.base;
 
 import java.util.Properties;
 
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+
 public class PhysicalOptimizationConfig {
     private static final int MB = 1048576;
 
@@ -31,10 +33,11 @@ public class PhysicalOptimizationConfig {
     private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH";
     private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
     private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
-
     private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
     private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
     private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
+    private static final String SORT_PARALLEL = "SORT_PARALLEL";
+    private static final String SORT_SAMPLES = "SORT_SAMPLES";
 
     private Properties properties = new Properties();
 
@@ -143,6 +146,22 @@ public class PhysicalOptimizationConfig {
         setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
     }
 
+    public boolean getSortParallel() {
+        return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL);
+    }
+
+    public void setSortParallel(boolean sortParallel) {
+        setBoolean(SORT_PARALLEL, sortParallel);
+    }
+
+    public int getSortSamples() {
+        return getInt(SORT_SAMPLES, AlgebricksConfig.SORT_SAMPLES);
+    }
+
+    public void setSortSamples(int sortSamples) {
+        setInt(SORT_SAMPLES, sortSamples);
+    }
+
     private void setInt(String property, int value) {
         properties.setProperty(property, Integer.toString(value));
     }
@@ -167,4 +186,16 @@ public class PhysicalOptimizationConfig {
             return Double.parseDouble(value);
     }
 
+    private void setBoolean(String property, boolean value) {
+        properties.setProperty(property, Boolean.toString(value));
+    }
+
+    private boolean getBoolean(String property, boolean defaultValue) {
+        String value = properties.getProperty(property);
+        if (value == null) {
+            return defaultValue;
+        } else {
+            return Boolean.parseBoolean(value);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
index 8ada0ac..d6895e3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
@@ -29,12 +29,16 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -45,15 +49,13 @@ import org.apache.hyracks.api.job.JobSpecification;
 
 public class DotFormatGenerator {
 
-    private DotFormatGenerator() {
-    }
+    private final LogicalOperatorDotVisitor dotVisitor = new LogicalOperatorDotVisitor();
 
     /**
-     * Generates DOT format for {@link JobActivityGraph} that can be visualized
-     * using any DOT format visualizer.
+     * Generates DOT format plan for {@link JobActivityGraph} that can be visualized using any DOT format visualizer.
      *
      * @param jobActivityGraph The job activity graph
-     * @return DOT format
+     * @return DOT format plan
      */
     public static String generate(final JobActivityGraph jobActivityGraph) {
         final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobActivityGraph"));
@@ -146,92 +148,74 @@ public class DotFormatGenerator {
     }
 
     /**
-     * Generates DOT format for {@link JobSpecification} that can be visualized
-     * using any DOT format visualizer.
+     * Generates DOT format plan for {@link JobSpecification} that can be visualized using any DOT format visualizer.
      *
      * @param jobSpecification The job specification
-     * @return DOT format
+     * @return DOT format plan
      */
     public static String generate(final JobSpecification jobSpecification) {
         final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobSpecification"));
         final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap = jobSpecification.getConnectorMap();
-        final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp =
+        final Set<Constraint> constraints = jobSpecification.getUserConstraints();
+        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp =
                 jobSpecification.getConnectorOperatorMap();
-        ConnectorDescriptorId connectorId;
-        IConnectorDescriptor connector;
-        IOperatorDescriptor leftOperator;
-        IOperatorDescriptor rightOperator;
-        DotFormatBuilder.Node sourceNode;
-        DotFormatBuilder.Node destinationNode;
-        String source;
-        String destination;
-        String edgeLabel;
-        for (Map.Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : cOp
-                .entrySet()) {
-            connectorId = entry.getKey();
-            connector = connectorMap.get(connectorId);
-            edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1);
-            edgeLabel += "-" + connectorId;
-            leftOperator = entry.getValue().getLeft().getLeft();
-            rightOperator = entry.getValue().getRight().getLeft();
-            source = leftOperator.getClass().getName()
-                    .substring(leftOperator.getClass().getName().lastIndexOf(".") + 1);
-            sourceNode =
-                    graphBuilder.createNode(DotFormatBuilder.StringValue.of(leftOperator.getOperatorId().toString()),
-                            DotFormatBuilder.StringValue.of(leftOperator.toString() + "-" + source));
-            destination = rightOperator.getClass().getName()
-                    .substring(rightOperator.getClass().getName().lastIndexOf(".") + 1);
-            destinationNode =
-                    graphBuilder.createNode(DotFormatBuilder.StringValue.of(rightOperator.getOperatorId().toString()),
-                            DotFormatBuilder.StringValue.of(rightOperator.toString() + "-" + destination));
-            graphBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel));
-        }
-
+        cOp.forEach((connId, srcAndDest) -> addToGraph(graphBuilder, constraints, connectorMap, connId, srcAndDest));
         return graphBuilder.getDotDocument();
     }
 
     /**
-     * Generates DOT format for {@link ILogicalPlan} that can be visualized
-     * using any DOT format visualizer.
+     * Generates DOT format plan for {@link ILogicalPlan} that can be visualized using any DOT format visualizer.
      *
      * @param plan  The logical plan
-     * @param dotVisitor    The DOT visitor
-     * @return DOT format
-     * @throws AlgebricksException
+     * @param showDetails whether to show the details of the operator like physical properties
+     * @return DOT format plan
+     * @throws AlgebricksException When one operator throws an exception while visiting it.
      */
-    public static String generate(ILogicalPlan plan, LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException {
-        final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan"));
+    public String generate(ILogicalPlan plan, boolean showDetails) throws AlgebricksException {
         ILogicalOperator root = plan.getRoots().get(0).getValue();
-        generateNode(graphBuilder, root, dotVisitor, new HashSet<>());
+        return generate(root, showDetails);
+    }
+
+    /**
+     * Generates DOT format plan considering "startingOp" as the root operator.
+     *
+     * @param startingOp the starting operator
+     * @param showDetails whether to show the details of the operator like physical properties
+     * @return DOT format plan
+     * @throws AlgebricksException When one operator throws an exception while visiting it.
+     */
+    public String generate(ILogicalOperator startingOp, boolean showDetails) throws AlgebricksException {
+        final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan"));
+        generateNode(graphBuilder, startingOp, showDetails, new HashSet<>());
         return graphBuilder.getDotDocument();
     }
 
-    public static void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op,
-            LogicalOperatorDotVisitor dotVisitor, Set<ILogicalOperator> operatorsVisited) throws AlgebricksException {
-        DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, dotVisitor);
+    private void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op, boolean showDetails,
+            Set<ILogicalOperator> operatorsVisited) throws AlgebricksException {
+        DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, showDetails);
         DotFormatBuilder.Node destinationNode = dotBuilder
                 .createNode(DotFormatBuilder.StringValue.of(Integer.toString(op.hashCode())), destinationNodeLabel);
         DotFormatBuilder.StringValue sourceNodeLabel;
         DotFormatBuilder.Node sourceNode;
         for (Mutable<ILogicalOperator> child : op.getInputs()) {
-            sourceNodeLabel = formatStringOf(child.getValue(), dotVisitor);
+            sourceNodeLabel = formatStringOf(child.getValue(), showDetails);
             sourceNode = dotBuilder.createNode(
                     DotFormatBuilder.StringValue.of(Integer.toString(child.getValue().hashCode())), sourceNodeLabel);
             dotBuilder.createEdge(sourceNode, destinationNode);
             if (!operatorsVisited.contains(child.getValue())) {
-                generateNode(dotBuilder, child.getValue(), dotVisitor, operatorsVisited);
+                generateNode(dotBuilder, child.getValue(), showDetails, operatorsVisited);
             }
         }
         if (((AbstractLogicalOperator) op).hasNestedPlans()) {
             ILogicalOperator nestedOperator;
             for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
                 nestedOperator = nestedPlan.getRoots().get(0).getValue();
-                sourceNodeLabel = formatStringOf(nestedOperator, dotVisitor);
+                sourceNodeLabel = formatStringOf(nestedOperator, showDetails);
                 sourceNode = dotBuilder.createNode(
                         DotFormatBuilder.StringValue.of(Integer.toString(nestedOperator.hashCode())), sourceNodeLabel);
                 dotBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of("subplan"));
                 if (!operatorsVisited.contains(nestedOperator)) {
-                    generateNode(dotBuilder, nestedOperator, dotVisitor, operatorsVisited);
+                    generateNode(dotBuilder, nestedOperator, showDetails, operatorsVisited);
                 }
             }
         }
@@ -246,7 +230,7 @@ public class DotFormatGenerator {
             sourceNode = destinationNode;
             for (int i = 0; i < replicateOperator.getOutputs().size(); i++) {
                 replicateOutput = replicateOperator.getOutputs().get(i).getValue();
-                destinationNodeLabel = formatStringOf(replicateOutput, dotVisitor);
+                destinationNodeLabel = formatStringOf(replicateOutput, showDetails);
                 destinationNode = dotBuilder.createNode(
                         DotFormatBuilder.StringValue.of(Integer.toString(replicateOutput.hashCode())),
                         destinationNodeLabel);
@@ -261,16 +245,52 @@ public class DotFormatGenerator {
         operatorsVisited.add(op);
     }
 
-    private static DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator,
-            LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException {
-        String formattedString = operator.accept(dotVisitor, null).trim();
-        IPhysicalOperator physicalOperator = ((AbstractLogicalOperator) operator).getPhysicalOperator();
-        if (physicalOperator != null) {
-            formattedString += "\\n" + physicalOperator.toString().trim() + " |" + operator.getExecutionMode() + "|";
-        } else {
-            formattedString += "\\n|" + operator.getExecutionMode() + "|";
-        }
-
+    private DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator, boolean showDetails)
+            throws AlgebricksException {
+        String formattedString = operator.accept(dotVisitor, showDetails).trim();
         return DotFormatBuilder.StringValue.of(formattedString);
     }
+
+    private static void addToGraph(DotFormatBuilder graph, Set<Constraint> constraints,
+            Map<ConnectorDescriptorId, IConnectorDescriptor> connMap, ConnectorDescriptorId connId,
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> srcAndDest) {
+        IConnectorDescriptor connector = connMap.get(connId);
+        String edgeLabel;
+        edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1);
+        edgeLabel += "-" + connId;
+        IOperatorDescriptor sourceOp = srcAndDest.getLeft().getLeft();
+        IOperatorDescriptor destOp = srcAndDest.getRight().getLeft();
+        StringBuilder source = new StringBuilder(
+                sourceOp.getClass().getName().substring(sourceOp.getClass().getName().lastIndexOf(".") + 1));
+        StringBuilder destination = new StringBuilder(
+                destOp.getClass().getName().substring(destOp.getClass().getName().lastIndexOf(".") + 1));
+        // constraints
+        for (Constraint constraint : constraints) {
+            LValueConstraintExpression lvalue = constraint.getLValue();
+            if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
+                PartitionCountExpression count = (PartitionCountExpression) lvalue;
+                if (count.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) {
+                    source.append("\n").append(constraint);
+                }
+                if (count.getOperatorDescriptorId().equals(destOp.getOperatorId())) {
+                    destination.append("\n").append(constraint);
+                }
+            } else if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_LOCATION) {
+                PartitionLocationExpression location = (PartitionLocationExpression) lvalue;
+                if (location.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) {
+                    source.append("\n").append(constraint);
+                }
+                if (location.getOperatorDescriptorId().equals(destOp.getOperatorId())) {
+                    destination.append("\n").append(constraint);
+                }
+            }
+        }
+        DotFormatBuilder.Node sourceNode =
+                graph.createNode(DotFormatBuilder.StringValue.of(sourceOp.getOperatorId().toString()),
+                        DotFormatBuilder.StringValue.of(sourceOp.toString() + "-" + source));
+        DotFormatBuilder.Node destinationNode =
+                graph.createNode(DotFormatBuilder.StringValue.of(destOp.getOperatorId().toString()),
+                        DotFormatBuilder.StringValue.of(destOp.toString() + "-" + destination));
+        graph.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel));
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 2cb2d35..113d205 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -18,14 +18,20 @@
  */
 package org.apache.hyracks.algebricks.core.utils;
 
+import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_GROUPING_PROPERTY;
+import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY;
+
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -35,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -62,9 +69,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Void> {
+public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Boolean> {
 
     private final StringBuilder stringBuilder;
 
@@ -82,161 +94,214 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
     }
 
     @Override
-    public String visitAggregateOperator(AggregateOperator op, Void noArgs) throws AlgebricksException {
+    public String visitAggregateOperator(AggregateOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("aggregate ").append(str(op.getVariables())).append(" <- ");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitRunningAggregateOperator(RunningAggregateOperator op, Void noArgs) throws AlgebricksException {
+    public String visitRunningAggregateOperator(RunningAggregateOperator op, Boolean showDetails)
+            throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void noArgs) throws AlgebricksException {
+    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("empty-tuple-source");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitGroupByOperator(GroupByOperator op, Void noArgs) throws AlgebricksException {
+    public String visitGroupByOperator(GroupByOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("group by").append(op.isGroupAll() ? " (all)" : "").append(" (");
-        pprintVeList(op.getGroupByList());
+        printVariableAndExprList(op.getGroupByList());
         stringBuilder.append(") decor (");
-        pprintVeList(op.getDecorList());
+        printVariableAndExprList(op.getDecorList());
         stringBuilder.append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitDistinctOperator(DistinctOperator op, Void noArgs) throws AlgebricksException {
+    public String visitDistinctOperator(DistinctOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("distinct (");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
         stringBuilder.append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitInnerJoinOperator(InnerJoinOperator op, Void noArgs) throws AlgebricksException {
+    public String visitInnerJoinOperator(InnerJoinOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("join (").append(op.getCondition().getValue().toString()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void noArgs) throws AlgebricksException {
+    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("left outer join (").append(op.getCondition().getValue().toString()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void noArgs) throws AlgebricksException {
+    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean showDetails)
+            throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("nested tuple source");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitOrderOperator(OrderOperator op, Void noArgs) throws AlgebricksException {
+    public String visitOrderOperator(OrderOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("order ");
         for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
             if (op.getTopK() != -1) {
                 stringBuilder.append("(topK: ").append(op.getTopK()).append(") ");
             }
-            String fst = getOrderString(p.first);
-            stringBuilder.append("(").append(fst).append(", ").append(p.second.getValue().toString()).append(") ");
+            stringBuilder.append("(");
+            switch (p.first.getKind()) {
+                case ASC:
+                    stringBuilder.append("ASC");
+                    break;
+                case DESC:
+                    stringBuilder.append("DESC");
+                    break;
+                default:
+                    final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef();
+                    stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
+            }
+            stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
         }
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
-    private String getOrderString(OrderOperator.IOrder first) {
-        switch (first.getKind()) {
-            case ASC:
-                return "ASC";
-            case DESC:
-                return "DESC";
-            default:
-                return first.getExpressionRef().toString();
-        }
-    }
-
     @Override
-    public String visitAssignOperator(AssignOperator op, Void noArgs) throws AlgebricksException {
+    public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("assign ").append(str(op.getVariables())).append(" <- ");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitWriteOperator(WriteOperator op, Void noArgs) throws AlgebricksException {
+    public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("write ");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitDistributeResultOperator(DistributeResultOperator op, Void noArgs) throws AlgebricksException {
+    public String visitDistributeResultOperator(DistributeResultOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("distribute result ");
-        pprintExprList(op.getExpressions());
+        printExprList(op.getExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitWriteResultOperator(WriteResultOperator op, Void noArgs) throws AlgebricksException {
+    public String visitWriteResultOperator(WriteResultOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("load ").append(str(op.getDataSource())).append(" from ")
                 .append(op.getPayloadExpression().getValue().toString()).append(" partitioned by ");
-        pprintExprList(op.getKeyExpressions());
+        printExprList(op.getKeyExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitSelectOperator(SelectOperator op, Void noArgs) throws AlgebricksException {
+    public String visitSelectOperator(SelectOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("select (").append(op.getCondition().getValue().toString()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitProjectOperator(ProjectOperator op, Void noArgs) throws AlgebricksException {
+    public String visitProjectOperator(ProjectOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("project ").append("(").append(op.getVariables()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitSubplanOperator(SubplanOperator op, Void noArgs) throws AlgebricksException {
+    public String visitSubplanOperator(SubplanOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("subplan {}");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitUnionOperator(UnionAllOperator op, Void noArgs) throws AlgebricksException {
+    public String visitUnionOperator(UnionAllOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("union");
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
             stringBuilder.append(" (").append(v.first).append(", ").append(v.second).append(", ").append(v.third)
                     .append(")");
         }
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitIntersectOperator(IntersectOperator op, Void noArgs) throws AlgebricksException {
+    public String visitIntersectOperator(IntersectOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("intersect (");
         stringBuilder.append('[');
@@ -261,154 +326,183 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
             stringBuilder.append(']');
         }
         stringBuilder.append("])");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitUnnestOperator(UnnestOperator op, Void noArgs) throws AlgebricksException {
+    public String visitUnnestOperator(UnnestOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("unnest ").append(op.getVariable());
         if (op.getPositionalVariable() != null) {
             stringBuilder.append(" at ").append(op.getPositionalVariable());
         }
         stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void noArgs) throws AlgebricksException {
+    public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean showDetails)
+            throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("outer-unnest ").append(op.getVariable());
         if (op.getPositionalVariable() != null) {
             stringBuilder.append(" at ").append(op.getPositionalVariable());
         }
         stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitUnnestMapOperator(UnnestMapOperator op, Void noArgs) throws AlgebricksException {
+    public String visitUnnestMapOperator(UnnestMapOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
-        printAbstractUnnestMapOperator(op, "unnest-map");
-        appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
-        appendLimitInformation(stringBuilder, op.getOutputLimit());
+        printAbstractUnnestMapOperator(op, "unnest-map", showDetails);
+        appendSelectConditionInformation(op.getSelectCondition());
+        appendLimitInformation(op.getOutputLimit());
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void noArgs)
+    public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean showDetails)
             throws AlgebricksException {
         stringBuilder.setLength(0);
-        printAbstractUnnestMapOperator(op, "left-outer-unnest-map");
+        printAbstractUnnestMapOperator(op, "left-outer-unnest-map", showDetails);
         return stringBuilder.toString();
     }
 
-    private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature) {
+    private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature, boolean show) {
         stringBuilder.append(opSignature).append(" ").append(op.getVariables()).append(" <- ")
                 .append(op.getExpressionRef().getValue().toString());
-        appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
+        appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars());
+        appendSchema(op, show);
+        appendAnnotations(op, show);
+        appendPhysicalOperatorInfo(op, show);
     }
 
     @Override
-    public String visitDataScanOperator(DataSourceScanOperator op, Void noArgs) throws AlgebricksException {
+    public String visitDataScanOperator(DataSourceScanOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("data-scan ").append(op.getProjectVariables()).append("<-").append(op.getVariables())
                 .append(" <- ").append(op.getDataSource());
-        appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
-        appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
-        appendLimitInformation(stringBuilder, op.getOutputLimit());
+        appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars());
+        appendSelectConditionInformation(op.getSelectCondition());
+        appendLimitInformation(op.getOutputLimit());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
-    private void appendFilterInformation(StringBuilder plan, List<LogicalVariable> minFilterVars,
-            List<LogicalVariable> maxFilterVars) {
+    private void appendFilterInformation(List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars) {
         if (minFilterVars != null || maxFilterVars != null) {
-            plan.append(" with filter on");
+            stringBuilder.append(" with filter on");
         }
         if (minFilterVars != null) {
-            plan.append(" min:").append(minFilterVars);
+            stringBuilder.append(" min:").append(minFilterVars);
         }
         if (maxFilterVars != null) {
-            plan.append(" max:").append(maxFilterVars);
+            stringBuilder.append(" max:").append(maxFilterVars);
         }
     }
 
-    private Void appendSelectConditionInformation(StringBuilder plan, Mutable<ILogicalExpression> condition)
-            throws AlgebricksException {
+    private void appendSelectConditionInformation(Mutable<ILogicalExpression> condition) throws AlgebricksException {
         if (condition != null) {
-            plan.append(" condition:").append(condition.getValue().toString());
+            stringBuilder.append(" condition:").append(condition.getValue().toString());
         }
-        return null;
     }
 
-    private Void appendLimitInformation(StringBuilder plan, long outputLimit) throws AlgebricksException {
+    private void appendLimitInformation(long outputLimit) throws AlgebricksException {
         if (outputLimit >= 0) {
-            plan.append(" limit:").append(String.valueOf(outputLimit));
+            stringBuilder.append(" limit:").append(String.valueOf(outputLimit));
         }
-        return null;
     }
 
     @Override
-    public String visitLimitOperator(LimitOperator op, Void noArgs) throws AlgebricksException {
+    public String visitLimitOperator(LimitOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("limit ").append(op.getMaxObjects().getValue().toString());
         ILogicalExpression offset = op.getOffset().getValue();
         if (offset != null) {
             stringBuilder.append(", ").append(offset.toString());
         }
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitExchangeOperator(ExchangeOperator op, Void noArgs) throws AlgebricksException {
+    public String visitExchangeOperator(ExchangeOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("exchange");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitScriptOperator(ScriptOperator op, Void noArgs) throws AlgebricksException {
+    public String visitScriptOperator(ScriptOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("script (in: ").append(op.getInputVariables()).append(") (out: ")
                 .append(op.getOutputVariables()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitReplicateOperator(ReplicateOperator op, Void noArgs) throws AlgebricksException {
+    public String visitReplicateOperator(ReplicateOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("replicate");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitSplitOperator(SplitOperator op, Void noArgs) throws AlgebricksException {
+    public String visitSplitOperator(SplitOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression();
         stringBuilder.append("split ").append(branchingExpression.getValue().toString());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitMaterializeOperator(MaterializeOperator op, Void noArgs) throws AlgebricksException {
+    public String visitMaterializeOperator(MaterializeOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("materialize");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void noArgs)
-            throws AlgebricksException {
+    public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         String header = getIndexOpString(op.getOperation());
         stringBuilder.append(header).append(str(op.getDataSource())).append(" from record: ")
                 .append(op.getPayloadExpression().getValue().toString());
         if (op.getAdditionalNonFilteringExpressions() != null) {
             stringBuilder.append(", meta: ");
-            pprintExprList(op.getAdditionalNonFilteringExpressions());
+            printExprList(op.getAdditionalNonFilteringExpressions());
         }
         stringBuilder.append(" partitioned by ");
-        pprintExprList(op.getPrimaryKeyExpressions());
+        printExprList(op.getPrimaryKeyExpressions());
         if (op.getOperation() == Kind.UPSERT) {
             stringBuilder.append(" out: ([record-before-upsert:").append(op.getBeforeOpRecordVar());
             if (op.getBeforeOpAdditionalNonFilteringVars() != null) {
@@ -419,27 +513,32 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
         if (op.isBulkload()) {
             stringBuilder.append(" [bulkload]");
         }
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void noArgs)
-            throws AlgebricksException {
+    public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         String header = getIndexOpString(op.getOperation());
         stringBuilder.append(header).append(op.getIndexName()).append(" on ")
                 .append(str(op.getDataSourceIndex().getDataSource())).append(" from ");
         if (op.getOperation() == Kind.UPSERT) {
             stringBuilder.append(" replace:");
-            pprintExprList(op.getPrevSecondaryKeyExprs());
+            printExprList(op.getPrevSecondaryKeyExprs());
             stringBuilder.append(" with:");
-            pprintExprList(op.getSecondaryKeyExpressions());
+            printExprList(op.getSecondaryKeyExpressions());
         } else {
-            pprintExprList(op.getSecondaryKeyExpressions());
+            printExprList(op.getSecondaryKeyExpressions());
         }
         if (op.isBulkload()) {
             stringBuilder.append(" [bulkload]");
         }
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
@@ -452,60 +551,143 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
             case UPSERT:
                 return "upsert into ";
         }
-        return null;
+        return "";
     }
 
     @Override
-    public String visitTokenizeOperator(TokenizeOperator op, Void noArgs) throws AlgebricksException {
+    public String visitTokenizeOperator(TokenizeOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("tokenize ").append(str(op.getTokenizeVars())).append(" <- ");
-        pprintExprList(op.getSecondaryKeyExpressions());
+        printExprList(op.getSecondaryKeyExpressions());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitSinkOperator(SinkOperator op, Void noArgs) throws AlgebricksException {
+    public String visitSinkOperator(SinkOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
         stringBuilder.append("sink");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
     @Override
-    public String visitDelegateOperator(DelegateOperator op, Void noArgs) throws AlgebricksException {
+    public String visitDelegateOperator(DelegateOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append(op.toString());
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
+        return stringBuilder.toString();
+    }
+
+    @Override
+    public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException {
+        stringBuilder.setLength(0);
+        stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
         return stringBuilder.toString();
     }
 
-    private void pprintExprList(List<Mutable<ILogicalExpression>> expressions) {
+    private void printExprList(List<Mutable<ILogicalExpression>> expressions) {
+        stringBuilder.append("[");
+        expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", "));
+        stringBuilder.append("]");
+    }
+
+    private void printVariableAndExprList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> variableExprList) {
         stringBuilder.append("[");
         boolean first = true;
-        for (Mutable<ILogicalExpression> exprRef : expressions) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> variableExpressionPair : variableExprList) {
             if (first) {
                 first = false;
             } else {
-                stringBuilder.append(", ");
+                stringBuilder.append("; ");
+            }
+            if (variableExpressionPair.first != null) {
+                stringBuilder.append(variableExpressionPair.first).append(" := ").append(variableExpressionPair.second);
+            } else {
+                stringBuilder.append(variableExpressionPair.second.getValue().toString());
             }
-            stringBuilder.append(exprRef.getValue().toString());
         }
         stringBuilder.append("]");
     }
 
-    private void pprintVeList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) {
-        stringBuilder.append("[");
-        boolean fst = true;
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) {
-            if (fst) {
-                fst = false;
-            } else {
-                stringBuilder.append("; ");
+    private void appendSchema(AbstractLogicalOperator op, boolean show) {
+        if (show) {
+            stringBuilder.append("\\nSchema: ");
+            final List<LogicalVariable> schema = op.getSchema();
+            stringBuilder.append(schema == null ? "null" : schema);
+        }
+    }
+
+    private void appendAnnotations(AbstractLogicalOperator op, boolean show) {
+        if (show) {
+            final Map<String, Object> annotations = op.getAnnotations();
+            if (!annotations.isEmpty()) {
+                stringBuilder.append("\\nAnnotations: ").append(annotations);
             }
-            if (ve.first != null) {
-                stringBuilder.append(ve.first).append(" := ").append(ve.second);
-            } else {
-                stringBuilder.append(ve.second.getValue().toString());
+        }
+    }
+
+    private void appendPhysicalOperatorInfo(AbstractLogicalOperator op, boolean show) {
+        IPhysicalOperator physicalOp = op.getPhysicalOperator();
+        stringBuilder.append("\\n").append(physicalOp == null ? "null" : physicalOp.toString().trim());
+        stringBuilder.append(", Exec: ").append(op.getExecutionMode());
+        if (show) {
+            IPhysicalPropertiesVector properties = physicalOp == null ? null : physicalOp.getDeliveredProperties();
+            List<ILocalStructuralProperty> localProp = properties == null ? null : properties.getLocalProperties();
+            IPartitioningProperty partitioningProp = properties == null ? null : properties.getPartitioningProperty();
+            if (localProp != null) {
+                stringBuilder.append("\\nProperties in each partition: [");
+                for (ILocalStructuralProperty property : localProp) {
+                    if (property == null) {
+                        stringBuilder.append("null, ");
+                    } else if (property.getPropertyType() == LOCAL_ORDER_PROPERTY) {
+                        stringBuilder.append("ordered by ");
+                    } else if (property.getPropertyType() == LOCAL_GROUPING_PROPERTY) {
+                        stringBuilder.append("group by ");
+                    }
+                    stringBuilder.append(property).append(", ");
+                }
+                stringBuilder.append("]");
+            }
+
+            if (partitioningProp != null) {
+                stringBuilder.append("\\n").append(partitioningProp.getPartitioningType()).append(":");
+                INodeDomain nodeDomain = partitioningProp.getNodeDomain();
+                stringBuilder.append("\\n ");
+                if (nodeDomain != null && nodeDomain.cardinality() != null) {
+                    stringBuilder.append(nodeDomain.cardinality()).append(" partitions. ");
+                }
+                switch (partitioningProp.getPartitioningType()) {
+                    case BROADCAST:
+                        stringBuilder.append("Data is broadcast to partitions.");
+                        break;
+                    case RANDOM:
+                        stringBuilder.append("Data is randomly partitioned.");
+                        break;
+                    case ORDERED_PARTITIONED:
+                        stringBuilder.append("Data is orderly partitioned via a range.");
+                        break;
+                    case UNORDERED_PARTITIONED:
+                        stringBuilder.append("Data is hash partitioned.");
+                        break;
+                    case UNPARTITIONED:
+                        stringBuilder.append("Data is in one place.");
+                }
+                if (nodeDomain instanceof DefaultNodeGroupDomain) {
+                    DefaultNodeGroupDomain nd = (DefaultNodeGroupDomain) nodeDomain;
+                    stringBuilder.append("\\n").append(nd);
+                }
             }
         }
-        stringBuilder.append("]");
+
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 6f7f86a..cdab2f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -21,9 +21,11 @@ package org.apache.hyracks.algebricks.rewriter.rules;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -40,28 +42,41 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -90,13 +105,20 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
 import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
     private static final String HASH_MERGE = "hash_merge";
     private static final String TRUE_CONSTANT = "true";
     private PhysicalOptimizationConfig physicalOptimizationConfig;
+    private final FunctionIdentifier rangeMapFunction;
+    private final FunctionIdentifier localSamplingFun;
+
+    public EnforceStructuralPropertiesRule(FunctionIdentifier rangeMapFunction, FunctionIdentifier localSamplingFun) {
+        this.rangeMapFunction = rangeMapFunction;
+        this.localSamplingFun = localSamplingFun;
+    }
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -204,6 +226,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
         boolean changed = false;
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
         optimizeUsingConstraintsAndEquivClasses(op);
         PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
         IPhysicalPropertiesVector[] reqdProperties = null;
@@ -214,26 +237,19 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
         // compute properties and figure out the domain
         INodeDomain childrenDomain = null;
-        {
-            int j = 0;
-            for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-                AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
-                // recursive call
-                if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) {
-                    changed = true;
-                }
-                child.computeDeliveredPhysicalProperties(context);
-                IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
-                if (childrenDomain == null) {
-                    childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
-                } else {
-                    INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
-                    if (!childrenDomain.sameAs(dom2)) {
-                        childrenDomain = context.getComputationNodeDomain();
-                    }
-                }
-                j++;
+        int j = 0;
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            changed |= physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context);
+            child.computeDeliveredPhysicalProperties(context);
+            IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
+            INodeDomain childDomain = delivered.getPartitioningProperty().getNodeDomain();
+            if (childrenDomain == null) {
+                childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
+            } else if (!childrenDomain.sameAs(childDomain)) {
+                childrenDomain = context.getComputationNodeDomain();
             }
+            j++;
         }
 
         if (reqdProperties != null) {
@@ -252,7 +268,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
         IPartitioningProperty firstDeliveredPartitioning = null;
         // Enforce data properties in a top-down manner.
-        for (int j = 0; j < op.getInputs().size(); j++) {
+        for (j = 0; j < op.getInputs().size(); j++) {
             // Starts from a partitioning-compatible child if any to loop over all children.
             int childIndex = (j + startChildIndex) % op.getInputs().size();
             IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
@@ -555,6 +571,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         return new MutableObject<ILogicalOperator>(oo);
     }
 
+    /**
+     * Adds exchange operators (connectors) between {@code op} & its child at index {@code childIdx}.
+     * @param op the parent operator that is requiring a specific kind of connector at its child
+     * @param i the child index where we want to have the connector
+     * @param pp the required partitioning property at that child (i.e. the required connector)
+     * @param required the physical properties required at that child (partitioning + local properties)
+     * @param deliveredByChild the physical properties delivered by that child (partitioning + local properties)
+     * @param domain the destination domain of nodes that we want the connector to connect to
+     * @param context {@link IOptimizationContext}
+     * @throws AlgebricksException
+     */
     private void addPartitioningEnforcers(ILogicalOperator op, int i, IPartitioningProperty pp,
             IPhysicalPropertiesVector required, IPhysicalPropertiesVector deliveredByChild, INodeDomain domain,
             IOptimizationContext context) throws AlgebricksException {
@@ -562,52 +589,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             IPhysicalOperator pop;
             switch (pp.getPartitioningType()) {
                 case UNPARTITIONED: {
-                    List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
-                    if (ordCols.isEmpty()) {
-                        pop = new RandomMergeExchangePOperator();
-                    } else {
-                        if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
-                            IRangeMap rangeMap =
-                                    (IRangeMap) op.getAnnotations().get(OperatorAnnotations.USE_RANGE_CONNECTOR);
-                            pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
-                        } else {
-                            OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
-                            sortColumns = ordCols.toArray(sortColumns);
-                            pop = new SortMergeExchangePOperator(sortColumns);
-                        }
-                    }
+                    pop = createMergingConnector(op, domain, deliveredByChild);
                     break;
                 }
                 case UNORDERED_PARTITIONED: {
-                    List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
-                    String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE);
-                    if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
-                        pop = new HashPartitionExchangePOperator(varList, domain);
-                        break;
-                    }
-                    List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
-                    List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
-                    boolean propWasSet = false;
-                    pop = null;
-                    if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
-                        AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
-                        Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c);
-                        List<FunctionalDependency> fds = context.getFDList(c);
-                        if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
-                            List<OrderColumn> orderColumns =
-                                    getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
-                            pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain);
-                            propWasSet = true;
-                        }
-                    }
-                    if (!propWasSet) {
-                        pop = new HashPartitionExchangePOperator(varList, domain);
-                    }
+                    pop = createHashConnector(context, deliveredByChild, domain, required, pp, i, op);
                     break;
                 }
                 case ORDERED_PARTITIONED: {
-                    pop = new RangePartitionExchangePOperator(((OrderedPartitionedProperty) pp).getOrderColumns(),
-                            domain, null);
+                    pop = createRangePartitionerConnector((AbstractLogicalOperator) op, domain, pp, i, context);
                     break;
                 }
                 case BROADCAST: {
@@ -640,6 +630,264 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         }
     }
 
+    private IPhysicalOperator createMergingConnector(ILogicalOperator parentOp, INodeDomain domain,
+            IPhysicalPropertiesVector deliveredByChild) {
+        IPhysicalOperator mergingConnector;
+        List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
+        if (ordCols.isEmpty()) {
+            IPartitioningProperty partitioningDeliveredByChild = deliveredByChild.getPartitioningProperty();
+            if (partitioningDeliveredByChild.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) {
+                mergingConnector = new SequentialMergeExchangePOperator();
+            } else {
+                mergingConnector = new RandomMergeExchangePOperator();
+            }
+        } else {
+            if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
+                RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
+                mergingConnector = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
+            } else {
+                OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
+                sortColumns = ordCols.toArray(sortColumns);
+                mergingConnector = new SortMergeExchangePOperator(sortColumns);
+            }
+        }
+        return mergingConnector;
+    }
+
+    private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
+            INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning,
+            int childIndex, ILogicalOperator parentOp) {
+        IPhysicalOperator hashConnector;
+        List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet());
+        String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE);
+        if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
+            hashConnector = new HashPartitionExchangePOperator(vars, domain);
+            return hashConnector;
+        }
+        List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
+        List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties();
+        boolean propWasSet = false;
+        hashConnector = null;
+        if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
+            AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue();
+            Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c);
+            List<FunctionalDependency> fds = ctx.getFDList(c);
+            if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
+                List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
+                hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain);
+                propWasSet = true;
+            }
+        }
+        if (!propWasSet) {
+            hashConnector = new HashPartitionExchangePOperator(vars, domain);
+        }
+        return hashConnector;
+    }
+
+    /**
+     * Creates a range-based exchange operator.
+     * @param parentOp the operator requiring range-based partitioner to have input tuples repartitioned using a range
+     * @param domain the target node domain of the range-based partitioner
+     * @param requiredPartitioning {@see OrderedPartitionedProperty}
+     * @param childIndex the index of the child at which the required partitioning is needed
+     * @param ctx optimization context
+     * @return a range-based exchange operator
+     * @throws AlgebricksException
+     */
+    private IPhysicalOperator createRangePartitionerConnector(AbstractLogicalOperator parentOp, INodeDomain domain,
+            IPartitioningProperty requiredPartitioning, int childIndex, IOptimizationContext ctx)
+            throws AlgebricksException {
+        // options for range partitioning: 1. static range map, 2. dynamic range map computed at run time
+        List<OrderColumn> partitioningColumns = ((OrderedPartitionedProperty) requiredPartitioning).getOrderColumns();
+        if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
+            // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here.
+            RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
+            return new RangePartitionExchangePOperator(partitioningColumns, domain, rangeMap);
+        } else {
+            return createDynamicRangePartitionExchangePOperator(parentOp, ctx, domain, partitioningColumns, childIndex);
+        }
+    }
+
+    private IPhysicalOperator createDynamicRangePartitionExchangePOperator(AbstractLogicalOperator parentOp,
+            IOptimizationContext ctx, INodeDomain targetDomain, List<OrderColumn> partitioningColumns, int childIndex)
+            throws AlgebricksException {
+        SourceLocation sourceLoc = parentOp.getSourceLocation();
+        // #1. create the replicate operator and add it above the source op feeding parent operator
+        ReplicateOperator replicateOp = createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, sourceLoc);
+
+        // these two exchange ops are needed so that the parents of replicate stay the same during later optimizations.
+        // This is because replicate operator has references to its parents. If any later optimizations add new parents,
+        // then replicate would still point to the old ones.
+        MutableObject<ILogicalOperator> replicateOpRef = new MutableObject<>(replicateOp);
+        ExchangeOperator exchToLocalAgg = createOneToOneExchangeOp(replicateOpRef, ctx);
+        ExchangeOperator exchToForward = createOneToOneExchangeOp(replicateOpRef, ctx);
+        MutableObject<ILogicalOperator> exchToLocalAggRef = new MutableObject<>(exchToLocalAgg);
+        MutableObject<ILogicalOperator> exchToForwardRef = new MutableObject<>(exchToForward);
+
+        // add the exchange--to-forward at output 0, the exchange-to-local-aggregate at output 1
+        replicateOp.getOutputs().add(exchToForwardRef);
+        replicateOp.getOutputs().add(exchToLocalAggRef);
+        // materialize the data to be able to re-read the data again after sampling is done
+        replicateOp.getOutputMaterializationFlags()[0] = true;
+
+        // #2. create the aggregate operators and their sampling functions
+        // $$samplingResultVar = local_samplingFun($$partitioning_column)
+        // $$rangeMapResultVar = global_rangeMapFun($$samplingResultVar)
+        List<LogicalVariable> samplingResultVar = new ArrayList<>(1);
+        List<LogicalVariable> rangeMapResultVar = new ArrayList<>(1);
+        List<Mutable<ILogicalExpression>> samplingFun = new ArrayList<>(1);
+        List<Mutable<ILogicalExpression>> rangeMapFun = new ArrayList<>(1);
+
+        createAggregateFunction(ctx, samplingResultVar, samplingFun, rangeMapResultVar, rangeMapFun,
+                targetDomain.cardinality(), partitioningColumns, sourceLoc);
+
+        AggregateOperator localAggOp =
+                createAggregate(samplingResultVar, false, samplingFun, exchToLocalAggRef, ctx, sourceLoc);
+        MutableObject<ILogicalOperator> localAgg = new MutableObject<>(localAggOp);
+        AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, true, rangeMapFun, localAgg, ctx, sourceLoc);
+        MutableObject<ILogicalOperator> globalAgg = new MutableObject<>(globalAggOp);
+
+        // #3. create the forward operator
+        String rangeMapKey = UUID.randomUUID().toString();
+        LogicalVariable rangeMapVar = rangeMapResultVar.get(0);
+        ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, exchToForwardRef, globalAgg, ctx, sourceLoc);
+        MutableObject<ILogicalOperator> forwardRef = new MutableObject<>(forward);
+
+        // replace the old input of parentOp requiring the range partitioning with the new forward op
+        parentOp.getInputs().set(childIndex, forwardRef);
+        parentOp.recomputeSchema();
+        ctx.computeAndSetTypeEnvironmentForOperator(parentOp);
+
+        return new RangePartitionExchangePOperator(partitioningColumns, rangeMapKey, targetDomain);
+    }
+
+    private static ReplicateOperator createReplicateOperator(Mutable<ILogicalOperator> inputOperator,
+            IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+        ReplicateOperator replicateOperator = new ReplicateOperator(2);
+        replicateOperator.setPhysicalOperator(new ReplicatePOperator());
+        replicateOperator.setSourceLocation(sourceLocation);
+        replicateOperator.getInputs().add(inputOperator);
+        OperatorManipulationUtil.setOperatorMode(replicateOperator);
+        replicateOperator.recomputeSchema();
+        context.computeAndSetTypeEnvironmentForOperator(replicateOperator);
+        return replicateOperator;
+    }
+
+    /**
+     * Creates the sampling expressions and embeds them in {@code localAggFunctions} & {@code globalAggFunctions}. Also,
+     * creates the variables which will hold the result of each one.
+     * {@code localResultVariables},{@code localAggFunctions},{@code globalResultVariables} & {@code globalAggFunctions}
+     * will be used when creating the corresponding aggregate operators.
+     * @param context used to get new variables which will be assigned the samples & the range map
+     * @param localResultVariables the variable to which the stats (e.g. samples) info is assigned
+     * @param localAggFunctions the local sampling expression is added to this list
+     * @param globalResultVariables the variable to which the range map is assigned
+     * @param globalAggFunctions the expression generating a range map is added to this list
+     * @param numPartitions passed to the expression generating a range map to know how many split points are needed
+     * @param partFields the fields based on which the partitioner partitions the tuples, also sampled fields
+     * @param sourceLocation source location
+     */
+    private void createAggregateFunction(IOptimizationContext context, List<LogicalVariable> localResultVariables,
+            List<Mutable<ILogicalExpression>> localAggFunctions, List<LogicalVariable> globalResultVariables,
+            List<Mutable<ILogicalExpression>> globalAggFunctions, int numPartitions, List<OrderColumn> partFields,
+            SourceLocation sourceLocation) {
+        // prepare the arguments of the local sampling function: sampled fields
+        List<Mutable<ILogicalExpression>> sampledFields = new ArrayList<>(partFields.size());
+        partFields.forEach(f -> {
+            AbstractLogicalExpression sampledField = new VariableReferenceExpression(f.getColumn());
+            sampledField.setSourceLocation(sourceLocation);
+            sampledFields.add(new MutableObject<>(sampledField));
+        });
+
+        // local info
+        IFunctionInfo samplingFun = context.getMetadataProvider().lookupFunction(localSamplingFun);
+        AbstractFunctionCallExpression samplingExp =
+                new AggregateFunctionCallExpression(samplingFun, false, sampledFields);
+        samplingExp.setSourceLocation(sourceLocation);
+        LogicalVariable samplingResultVar = context.newVar();
+        localResultVariables.add(samplingResultVar);
+        localAggFunctions.add(new MutableObject<>(samplingExp));
+        Object[] samplingParam = { context.getPhysicalOptimizationConfig().getSortSamples() };
+        samplingExp.setOpaqueParameters(samplingParam);
+
+        // prepare the argument of the global range map generator function: the result of the local function
+        List<Mutable<ILogicalExpression>> arg = new ArrayList<>(1);
+        AbstractLogicalExpression samplingResultVarExp = new VariableReferenceExpression(samplingResultVar);
+        samplingResultVarExp.setSourceLocation(sourceLocation);
+        arg.add(new MutableObject<>(samplingResultVarExp));
+
+        // global info
+        IFunctionInfo rangeMapFun = context.getMetadataProvider().lookupFunction(rangeMapFunction);
+        AbstractFunctionCallExpression rangeMapExp = new AggregateFunctionCallExpression(rangeMapFun, true, arg);
+        rangeMapExp.setSourceLocation(sourceLocation);
+        globalResultVariables.add(context.newVar());
+        globalAggFunctions.add(new MutableObject<>(rangeMapExp));
+
+        int i = 0;
+        boolean[] ascendingFlags = new boolean[partFields.size()];
+        for (OrderColumn column : partFields) {
+            ascendingFlags[i] = column.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
+            i++;
+        }
+        rangeMapExp.setOpaqueParameters(new Object[] { numPartitions, ascendingFlags });
+    }
+
+    /**
+     * Creates an aggregate operator. $$resultVariables = expressions()
+     * @param resultVariables the variables which stores the result of the aggregation
+     * @param isGlobal whether the aggregate operator is a global or local one
+     * @param expressions the aggregation functions desired
+     * @param inputOperator the input op that is feeding the aggregate operator
+     * @param context optimization context
+     * @param sourceLocation source location
+     * @return an aggregate operator with the specified information
+     * @throws AlgebricksException when there is error setting the type environment of the newly created aggregate op
+     */
+    private static AggregateOperator createAggregate(List<LogicalVariable> resultVariables, boolean isGlobal,
+            List<Mutable<ILogicalExpression>> expressions, MutableObject<ILogicalOperator> inputOperator,
+            IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+        AggregateOperator aggregateOperator = new AggregateOperator(resultVariables, expressions);
+        aggregateOperator.setPhysicalOperator(new AggregatePOperator());
+        aggregateOperator.setSourceLocation(sourceLocation);
+        aggregateOperator.getInputs().add(inputOperator);
+        aggregateOperator.setGlobal(isGlobal);
+        if (!isGlobal) {
+            aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
+        } else {
+            aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        }
+        aggregateOperator.recomputeSchema();
+        context.computeAndSetTypeEnvironmentForOperator(aggregateOperator);
+        return aggregateOperator;
+    }
+
+    private static ExchangeOperator createOneToOneExchangeOp(MutableObject<ILogicalOperator> inputOperator,
+            IOptimizationContext context) throws AlgebricksException {
+        ExchangeOperator exchangeOperator = new ExchangeOperator();
+        exchangeOperator.setPhysicalOperator(new OneToOneExchangePOperator());
+        exchangeOperator.getInputs().add(inputOperator);
+        exchangeOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        exchangeOperator.recomputeSchema();
+        context.computeAndSetTypeEnvironmentForOperator(exchangeOperator);
+        return exchangeOperator;
+    }
+
+    private static ForwardOperator createForward(String rangeMapKey, LogicalVariable rangeMapVariable,
+            MutableObject<ILogicalOperator> exchangeOpFromReplicate, MutableObject<ILogicalOperator> globalAggInput,
+            IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+        AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable);
+        rangeMapExpression.setSourceLocation(sourceLocation);
+        ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression));
+        forwardOperator.setSourceLocation(sourceLocation);
+        forwardOperator.setPhysicalOperator(new ForwardPOperator());
+        forwardOperator.getInputs().add(exchangeOpFromReplicate);
+        forwardOperator.getInputs().add(globalAggInput);
+        OperatorManipulationUtil.setOperatorMode(forwardOperator);
+        forwardOperator.recomputeSchema();
+        context.computeAndSetTypeEnvironmentForOperator(forwardOperator);
+        return forwardOperator;
+    }
+
     private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
         for (ILocalStructuralProperty lsp : cldLocals) {
             if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {