You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:33 UTC
[03/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR]
Parallel Sort Optimization
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 07ff0ab..d879d36 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.algebricks.core.rewriter.base;
import java.util.Properties;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+
public class PhysicalOptimizationConfig {
private static final int MB = 1048576;
@@ -31,10 +33,11 @@ public class PhysicalOptimizationConfig {
private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH";
private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
-
private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
+ private static final String SORT_PARALLEL = "SORT_PARALLEL";
+ private static final String SORT_SAMPLES = "SORT_SAMPLES";
private Properties properties = new Properties();
@@ -143,6 +146,22 @@ public class PhysicalOptimizationConfig {
setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
}
+ public boolean getSortParallel() {
+ return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL);
+ }
+
+ public void setSortParallel(boolean sortParallel) {
+ setBoolean(SORT_PARALLEL, sortParallel);
+ }
+
+ public int getSortSamples() {
+ return getInt(SORT_SAMPLES, AlgebricksConfig.SORT_SAMPLES);
+ }
+
+ public void setSortSamples(int sortSamples) {
+ setInt(SORT_SAMPLES, sortSamples);
+ }
+
private void setInt(String property, int value) {
properties.setProperty(property, Integer.toString(value));
}
@@ -167,4 +186,16 @@ public class PhysicalOptimizationConfig {
return Double.parseDouble(value);
}
+ private void setBoolean(String property, boolean value) {
+ properties.setProperty(property, Boolean.toString(value));
+ }
+
+ private boolean getBoolean(String property, boolean defaultValue) {
+ String value = properties.getProperty(property);
+ if (value == null) {
+ return defaultValue;
+ } else {
+ return Boolean.parseBoolean(value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
index 8ada0ac..d6895e3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java
@@ -29,12 +29,16 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IActivity;
@@ -45,15 +49,13 @@ import org.apache.hyracks.api.job.JobSpecification;
public class DotFormatGenerator {
- private DotFormatGenerator() {
- }
+ private final LogicalOperatorDotVisitor dotVisitor = new LogicalOperatorDotVisitor();
/**
- * Generates DOT format for {@link JobActivityGraph} that can be visualized
- * using any DOT format visualizer.
+ * Generates DOT format plan for {@link JobActivityGraph} that can be visualized using any DOT format visualizer.
*
* @param jobActivityGraph The job activity graph
- * @return DOT format
+ * @return DOT format plan
*/
public static String generate(final JobActivityGraph jobActivityGraph) {
final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobActivityGraph"));
@@ -146,92 +148,74 @@ public class DotFormatGenerator {
}
/**
- * Generates DOT format for {@link JobSpecification} that can be visualized
- * using any DOT format visualizer.
+ * Generates DOT format plan for {@link JobSpecification} that can be visualized using any DOT format visualizer.
*
* @param jobSpecification The job specification
- * @return DOT format
+ * @return DOT format plan
*/
public static String generate(final JobSpecification jobSpecification) {
final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobSpecification"));
final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap = jobSpecification.getConnectorMap();
- final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp =
+ final Set<Constraint> constraints = jobSpecification.getUserConstraints();
+ Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp =
jobSpecification.getConnectorOperatorMap();
- ConnectorDescriptorId connectorId;
- IConnectorDescriptor connector;
- IOperatorDescriptor leftOperator;
- IOperatorDescriptor rightOperator;
- DotFormatBuilder.Node sourceNode;
- DotFormatBuilder.Node destinationNode;
- String source;
- String destination;
- String edgeLabel;
- for (Map.Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : cOp
- .entrySet()) {
- connectorId = entry.getKey();
- connector = connectorMap.get(connectorId);
- edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1);
- edgeLabel += "-" + connectorId;
- leftOperator = entry.getValue().getLeft().getLeft();
- rightOperator = entry.getValue().getRight().getLeft();
- source = leftOperator.getClass().getName()
- .substring(leftOperator.getClass().getName().lastIndexOf(".") + 1);
- sourceNode =
- graphBuilder.createNode(DotFormatBuilder.StringValue.of(leftOperator.getOperatorId().toString()),
- DotFormatBuilder.StringValue.of(leftOperator.toString() + "-" + source));
- destination = rightOperator.getClass().getName()
- .substring(rightOperator.getClass().getName().lastIndexOf(".") + 1);
- destinationNode =
- graphBuilder.createNode(DotFormatBuilder.StringValue.of(rightOperator.getOperatorId().toString()),
- DotFormatBuilder.StringValue.of(rightOperator.toString() + "-" + destination));
- graphBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel));
- }
-
+ cOp.forEach((connId, srcAndDest) -> addToGraph(graphBuilder, constraints, connectorMap, connId, srcAndDest));
return graphBuilder.getDotDocument();
}
/**
- * Generates DOT format for {@link ILogicalPlan} that can be visualized
- * using any DOT format visualizer.
+ * Generates DOT format plan for {@link ILogicalPlan} that can be visualized using any DOT format visualizer.
*
* @param plan The logical plan
- * @param dotVisitor The DOT visitor
- * @return DOT format
- * @throws AlgebricksException
+ * @param showDetails whether to show the details of the operator like physical properties
+ * @return DOT format plan
+ * @throws AlgebricksException When one operator throws an exception while visiting it.
*/
- public static String generate(ILogicalPlan plan, LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException {
- final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan"));
+ public String generate(ILogicalPlan plan, boolean showDetails) throws AlgebricksException {
ILogicalOperator root = plan.getRoots().get(0).getValue();
- generateNode(graphBuilder, root, dotVisitor, new HashSet<>());
+ return generate(root, showDetails);
+ }
+
+ /**
+ * Generates DOT format plan considering "startingOp" as the root operator.
+ *
+ * @param startingOp the starting operator
+ * @param showDetails whether to show the details of the operator like physical properties
+ * @return DOT format plan
+ * @throws AlgebricksException When one operator throws an exception while visiting it.
+ */
+ public String generate(ILogicalOperator startingOp, boolean showDetails) throws AlgebricksException {
+ final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan"));
+ generateNode(graphBuilder, startingOp, showDetails, new HashSet<>());
return graphBuilder.getDotDocument();
}
- public static void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op,
- LogicalOperatorDotVisitor dotVisitor, Set<ILogicalOperator> operatorsVisited) throws AlgebricksException {
- DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, dotVisitor);
+ private void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op, boolean showDetails,
+ Set<ILogicalOperator> operatorsVisited) throws AlgebricksException {
+ DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, showDetails);
DotFormatBuilder.Node destinationNode = dotBuilder
.createNode(DotFormatBuilder.StringValue.of(Integer.toString(op.hashCode())), destinationNodeLabel);
DotFormatBuilder.StringValue sourceNodeLabel;
DotFormatBuilder.Node sourceNode;
for (Mutable<ILogicalOperator> child : op.getInputs()) {
- sourceNodeLabel = formatStringOf(child.getValue(), dotVisitor);
+ sourceNodeLabel = formatStringOf(child.getValue(), showDetails);
sourceNode = dotBuilder.createNode(
DotFormatBuilder.StringValue.of(Integer.toString(child.getValue().hashCode())), sourceNodeLabel);
dotBuilder.createEdge(sourceNode, destinationNode);
if (!operatorsVisited.contains(child.getValue())) {
- generateNode(dotBuilder, child.getValue(), dotVisitor, operatorsVisited);
+ generateNode(dotBuilder, child.getValue(), showDetails, operatorsVisited);
}
}
if (((AbstractLogicalOperator) op).hasNestedPlans()) {
ILogicalOperator nestedOperator;
for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
nestedOperator = nestedPlan.getRoots().get(0).getValue();
- sourceNodeLabel = formatStringOf(nestedOperator, dotVisitor);
+ sourceNodeLabel = formatStringOf(nestedOperator, showDetails);
sourceNode = dotBuilder.createNode(
DotFormatBuilder.StringValue.of(Integer.toString(nestedOperator.hashCode())), sourceNodeLabel);
dotBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of("subplan"));
if (!operatorsVisited.contains(nestedOperator)) {
- generateNode(dotBuilder, nestedOperator, dotVisitor, operatorsVisited);
+ generateNode(dotBuilder, nestedOperator, showDetails, operatorsVisited);
}
}
}
@@ -246,7 +230,7 @@ public class DotFormatGenerator {
sourceNode = destinationNode;
for (int i = 0; i < replicateOperator.getOutputs().size(); i++) {
replicateOutput = replicateOperator.getOutputs().get(i).getValue();
- destinationNodeLabel = formatStringOf(replicateOutput, dotVisitor);
+ destinationNodeLabel = formatStringOf(replicateOutput, showDetails);
destinationNode = dotBuilder.createNode(
DotFormatBuilder.StringValue.of(Integer.toString(replicateOutput.hashCode())),
destinationNodeLabel);
@@ -261,16 +245,52 @@ public class DotFormatGenerator {
operatorsVisited.add(op);
}
- private static DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator,
- LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException {
- String formattedString = operator.accept(dotVisitor, null).trim();
- IPhysicalOperator physicalOperator = ((AbstractLogicalOperator) operator).getPhysicalOperator();
- if (physicalOperator != null) {
- formattedString += "\\n" + physicalOperator.toString().trim() + " |" + operator.getExecutionMode() + "|";
- } else {
- formattedString += "\\n|" + operator.getExecutionMode() + "|";
- }
-
+ private DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator, boolean showDetails)
+ throws AlgebricksException {
+ String formattedString = operator.accept(dotVisitor, showDetails).trim();
return DotFormatBuilder.StringValue.of(formattedString);
}
+
+ private static void addToGraph(DotFormatBuilder graph, Set<Constraint> constraints,
+ Map<ConnectorDescriptorId, IConnectorDescriptor> connMap, ConnectorDescriptorId connId,
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> srcAndDest) {
+ IConnectorDescriptor connector = connMap.get(connId);
+ String edgeLabel;
+ edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1);
+ edgeLabel += "-" + connId;
+ IOperatorDescriptor sourceOp = srcAndDest.getLeft().getLeft();
+ IOperatorDescriptor destOp = srcAndDest.getRight().getLeft();
+ StringBuilder source = new StringBuilder(
+ sourceOp.getClass().getName().substring(sourceOp.getClass().getName().lastIndexOf(".") + 1));
+ StringBuilder destination = new StringBuilder(
+ destOp.getClass().getName().substring(destOp.getClass().getName().lastIndexOf(".") + 1));
+ // constraints
+ for (Constraint constraint : constraints) {
+ LValueConstraintExpression lvalue = constraint.getLValue();
+ if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
+ PartitionCountExpression count = (PartitionCountExpression) lvalue;
+ if (count.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) {
+ source.append("\n").append(constraint);
+ }
+ if (count.getOperatorDescriptorId().equals(destOp.getOperatorId())) {
+ destination.append("\n").append(constraint);
+ }
+ } else if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_LOCATION) {
+ PartitionLocationExpression location = (PartitionLocationExpression) lvalue;
+ if (location.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) {
+ source.append("\n").append(constraint);
+ }
+ if (location.getOperatorDescriptorId().equals(destOp.getOperatorId())) {
+ destination.append("\n").append(constraint);
+ }
+ }
+ }
+ DotFormatBuilder.Node sourceNode =
+ graph.createNode(DotFormatBuilder.StringValue.of(sourceOp.getOperatorId().toString()),
+ DotFormatBuilder.StringValue.of(sourceOp.toString() + "-" + source));
+ DotFormatBuilder.Node destinationNode =
+ graph.createNode(DotFormatBuilder.StringValue.of(destOp.getOperatorId().toString()),
+ DotFormatBuilder.StringValue.of(destOp.toString() + "-" + destination));
+ graph.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel));
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 2cb2d35..113d205 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -18,14 +18,20 @@
*/
package org.apache.hyracks.algebricks.core.utils;
+import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_GROUPING_PROPERTY;
+import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY;
+
import java.util.List;
+import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -35,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -62,9 +69,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Void> {
+public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Boolean> {
private final StringBuilder stringBuilder;
@@ -82,161 +94,214 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
}
@Override
- public String visitAggregateOperator(AggregateOperator op, Void noArgs) throws AlgebricksException {
+ public String visitAggregateOperator(AggregateOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("aggregate ").append(str(op.getVariables())).append(" <- ");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitRunningAggregateOperator(RunningAggregateOperator op, Void noArgs) throws AlgebricksException {
+ public String visitRunningAggregateOperator(RunningAggregateOperator op, Boolean showDetails)
+ throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void noArgs) throws AlgebricksException {
+ public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("empty-tuple-source");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitGroupByOperator(GroupByOperator op, Void noArgs) throws AlgebricksException {
+ public String visitGroupByOperator(GroupByOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("group by").append(op.isGroupAll() ? " (all)" : "").append(" (");
- pprintVeList(op.getGroupByList());
+ printVariableAndExprList(op.getGroupByList());
stringBuilder.append(") decor (");
- pprintVeList(op.getDecorList());
+ printVariableAndExprList(op.getDecorList());
stringBuilder.append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitDistinctOperator(DistinctOperator op, Void noArgs) throws AlgebricksException {
+ public String visitDistinctOperator(DistinctOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("distinct (");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
stringBuilder.append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitInnerJoinOperator(InnerJoinOperator op, Void noArgs) throws AlgebricksException {
+ public String visitInnerJoinOperator(InnerJoinOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("join (").append(op.getCondition().getValue().toString()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void noArgs) throws AlgebricksException {
+ public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("left outer join (").append(op.getCondition().getValue().toString()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void noArgs) throws AlgebricksException {
+ public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean showDetails)
+ throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("nested tuple source");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitOrderOperator(OrderOperator op, Void noArgs) throws AlgebricksException {
+ public String visitOrderOperator(OrderOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("order ");
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
if (op.getTopK() != -1) {
stringBuilder.append("(topK: ").append(op.getTopK()).append(") ");
}
- String fst = getOrderString(p.first);
- stringBuilder.append("(").append(fst).append(", ").append(p.second.getValue().toString()).append(") ");
+ stringBuilder.append("(");
+ switch (p.first.getKind()) {
+ case ASC:
+ stringBuilder.append("ASC");
+ break;
+ case DESC:
+ stringBuilder.append("DESC");
+ break;
+ default:
+ final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef();
+ stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
+ }
+ stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
}
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
- private String getOrderString(OrderOperator.IOrder first) {
- switch (first.getKind()) {
- case ASC:
- return "ASC";
- case DESC:
- return "DESC";
- default:
- return first.getExpressionRef().toString();
- }
- }
-
@Override
- public String visitAssignOperator(AssignOperator op, Void noArgs) throws AlgebricksException {
+ public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("assign ").append(str(op.getVariables())).append(" <- ");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitWriteOperator(WriteOperator op, Void noArgs) throws AlgebricksException {
+ public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("write ");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitDistributeResultOperator(DistributeResultOperator op, Void noArgs) throws AlgebricksException {
+ public String visitDistributeResultOperator(DistributeResultOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("distribute result ");
- pprintExprList(op.getExpressions());
+ printExprList(op.getExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitWriteResultOperator(WriteResultOperator op, Void noArgs) throws AlgebricksException {
+ public String visitWriteResultOperator(WriteResultOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("load ").append(str(op.getDataSource())).append(" from ")
.append(op.getPayloadExpression().getValue().toString()).append(" partitioned by ");
- pprintExprList(op.getKeyExpressions());
+ printExprList(op.getKeyExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitSelectOperator(SelectOperator op, Void noArgs) throws AlgebricksException {
+ public String visitSelectOperator(SelectOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("select (").append(op.getCondition().getValue().toString()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitProjectOperator(ProjectOperator op, Void noArgs) throws AlgebricksException {
+ public String visitProjectOperator(ProjectOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("project ").append("(").append(op.getVariables()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitSubplanOperator(SubplanOperator op, Void noArgs) throws AlgebricksException {
+ public String visitSubplanOperator(SubplanOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("subplan {}");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitUnionOperator(UnionAllOperator op, Void noArgs) throws AlgebricksException {
+ public String visitUnionOperator(UnionAllOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("union");
for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
stringBuilder.append(" (").append(v.first).append(", ").append(v.second).append(", ").append(v.third)
.append(")");
}
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitIntersectOperator(IntersectOperator op, Void noArgs) throws AlgebricksException {
+ public String visitIntersectOperator(IntersectOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("intersect (");
stringBuilder.append('[');
@@ -261,154 +326,183 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
stringBuilder.append(']');
}
stringBuilder.append("])");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitUnnestOperator(UnnestOperator op, Void noArgs) throws AlgebricksException {
+ public String visitUnnestOperator(UnnestOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("unnest ").append(op.getVariable());
if (op.getPositionalVariable() != null) {
stringBuilder.append(" at ").append(op.getPositionalVariable());
}
stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void noArgs) throws AlgebricksException {
+ public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean showDetails)
+ throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("outer-unnest ").append(op.getVariable());
if (op.getPositionalVariable() != null) {
stringBuilder.append(" at ").append(op.getPositionalVariable());
}
stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitUnnestMapOperator(UnnestMapOperator op, Void noArgs) throws AlgebricksException {
+ public String visitUnnestMapOperator(UnnestMapOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
- printAbstractUnnestMapOperator(op, "unnest-map");
- appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
- appendLimitInformation(stringBuilder, op.getOutputLimit());
+ printAbstractUnnestMapOperator(op, "unnest-map", showDetails);
+ appendSelectConditionInformation(op.getSelectCondition());
+ appendLimitInformation(op.getOutputLimit());
return stringBuilder.toString();
}
@Override
- public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void noArgs)
+ public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean showDetails)
throws AlgebricksException {
stringBuilder.setLength(0);
- printAbstractUnnestMapOperator(op, "left-outer-unnest-map");
+ printAbstractUnnestMapOperator(op, "left-outer-unnest-map", showDetails);
return stringBuilder.toString();
}
- private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature) {
+ private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature, boolean show) {
stringBuilder.append(opSignature).append(" ").append(op.getVariables()).append(" <- ")
.append(op.getExpressionRef().getValue().toString());
- appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
+ appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars());
+ appendSchema(op, show);
+ appendAnnotations(op, show);
+ appendPhysicalOperatorInfo(op, show);
}
@Override
- public String visitDataScanOperator(DataSourceScanOperator op, Void noArgs) throws AlgebricksException {
+ public String visitDataScanOperator(DataSourceScanOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("data-scan ").append(op.getProjectVariables()).append("<-").append(op.getVariables())
.append(" <- ").append(op.getDataSource());
- appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
- appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
- appendLimitInformation(stringBuilder, op.getOutputLimit());
+ appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars());
+ appendSelectConditionInformation(op.getSelectCondition());
+ appendLimitInformation(op.getOutputLimit());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
- private void appendFilterInformation(StringBuilder plan, List<LogicalVariable> minFilterVars,
- List<LogicalVariable> maxFilterVars) {
+ private void appendFilterInformation(List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars) {
if (minFilterVars != null || maxFilterVars != null) {
- plan.append(" with filter on");
+ stringBuilder.append(" with filter on");
}
if (minFilterVars != null) {
- plan.append(" min:").append(minFilterVars);
+ stringBuilder.append(" min:").append(minFilterVars);
}
if (maxFilterVars != null) {
- plan.append(" max:").append(maxFilterVars);
+ stringBuilder.append(" max:").append(maxFilterVars);
}
}
- private Void appendSelectConditionInformation(StringBuilder plan, Mutable<ILogicalExpression> condition)
- throws AlgebricksException {
+ private void appendSelectConditionInformation(Mutable<ILogicalExpression> condition) throws AlgebricksException {
if (condition != null) {
- plan.append(" condition:").append(condition.getValue().toString());
+ stringBuilder.append(" condition:").append(condition.getValue().toString());
}
- return null;
}
- private Void appendLimitInformation(StringBuilder plan, long outputLimit) throws AlgebricksException {
+ private void appendLimitInformation(long outputLimit) throws AlgebricksException {
if (outputLimit >= 0) {
- plan.append(" limit:").append(String.valueOf(outputLimit));
+ stringBuilder.append(" limit:").append(String.valueOf(outputLimit));
}
- return null;
}
@Override
- public String visitLimitOperator(LimitOperator op, Void noArgs) throws AlgebricksException {
+ public String visitLimitOperator(LimitOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("limit ").append(op.getMaxObjects().getValue().toString());
ILogicalExpression offset = op.getOffset().getValue();
if (offset != null) {
stringBuilder.append(", ").append(offset.toString());
}
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitExchangeOperator(ExchangeOperator op, Void noArgs) throws AlgebricksException {
+ public String visitExchangeOperator(ExchangeOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("exchange");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitScriptOperator(ScriptOperator op, Void noArgs) throws AlgebricksException {
+ public String visitScriptOperator(ScriptOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("script (in: ").append(op.getInputVariables()).append(") (out: ")
.append(op.getOutputVariables()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitReplicateOperator(ReplicateOperator op, Void noArgs) throws AlgebricksException {
+ public String visitReplicateOperator(ReplicateOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("replicate");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitSplitOperator(SplitOperator op, Void noArgs) throws AlgebricksException {
+ public String visitSplitOperator(SplitOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression();
stringBuilder.append("split ").append(branchingExpression.getValue().toString());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitMaterializeOperator(MaterializeOperator op, Void noArgs) throws AlgebricksException {
+ public String visitMaterializeOperator(MaterializeOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("materialize");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void noArgs)
- throws AlgebricksException {
+ public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
String header = getIndexOpString(op.getOperation());
stringBuilder.append(header).append(str(op.getDataSource())).append(" from record: ")
.append(op.getPayloadExpression().getValue().toString());
if (op.getAdditionalNonFilteringExpressions() != null) {
stringBuilder.append(", meta: ");
- pprintExprList(op.getAdditionalNonFilteringExpressions());
+ printExprList(op.getAdditionalNonFilteringExpressions());
}
stringBuilder.append(" partitioned by ");
- pprintExprList(op.getPrimaryKeyExpressions());
+ printExprList(op.getPrimaryKeyExpressions());
if (op.getOperation() == Kind.UPSERT) {
stringBuilder.append(" out: ([record-before-upsert:").append(op.getBeforeOpRecordVar());
if (op.getBeforeOpAdditionalNonFilteringVars() != null) {
@@ -419,27 +513,32 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
if (op.isBulkload()) {
stringBuilder.append(" [bulkload]");
}
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void noArgs)
- throws AlgebricksException {
+ public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
String header = getIndexOpString(op.getOperation());
stringBuilder.append(header).append(op.getIndexName()).append(" on ")
.append(str(op.getDataSourceIndex().getDataSource())).append(" from ");
if (op.getOperation() == Kind.UPSERT) {
stringBuilder.append(" replace:");
- pprintExprList(op.getPrevSecondaryKeyExprs());
+ printExprList(op.getPrevSecondaryKeyExprs());
stringBuilder.append(" with:");
- pprintExprList(op.getSecondaryKeyExpressions());
+ printExprList(op.getSecondaryKeyExpressions());
} else {
- pprintExprList(op.getSecondaryKeyExpressions());
+ printExprList(op.getSecondaryKeyExpressions());
}
if (op.isBulkload()) {
stringBuilder.append(" [bulkload]");
}
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@@ -452,60 +551,143 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
case UPSERT:
return "upsert into ";
}
- return null;
+ return "";
}
@Override
- public String visitTokenizeOperator(TokenizeOperator op, Void noArgs) throws AlgebricksException {
+ public String visitTokenizeOperator(TokenizeOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append("tokenize ").append(str(op.getTokenizeVars())).append(" <- ");
- pprintExprList(op.getSecondaryKeyExpressions());
+ printExprList(op.getSecondaryKeyExpressions());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitSinkOperator(SinkOperator op, Void noArgs) throws AlgebricksException {
+ public String visitSinkOperator(SinkOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
stringBuilder.append("sink");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
@Override
- public String visitDelegateOperator(DelegateOperator op, Void noArgs) throws AlgebricksException {
+ public String visitDelegateOperator(DelegateOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
stringBuilder.append(op.toString());
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
+ return stringBuilder.toString();
+ }
+
+ @Override
+ public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException {
+ stringBuilder.setLength(0);
+ stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
return stringBuilder.toString();
}
- private void pprintExprList(List<Mutable<ILogicalExpression>> expressions) {
+ private void printExprList(List<Mutable<ILogicalExpression>> expressions) {
+ stringBuilder.append("[");
+ expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", "));
+ stringBuilder.append("]");
+ }
+
+ private void printVariableAndExprList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> variableExprList) {
stringBuilder.append("[");
boolean first = true;
- for (Mutable<ILogicalExpression> exprRef : expressions) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> variableExpressionPair : variableExprList) {
if (first) {
first = false;
} else {
- stringBuilder.append(", ");
+ stringBuilder.append("; ");
+ }
+ if (variableExpressionPair.first != null) {
+ stringBuilder.append(variableExpressionPair.first).append(" := ").append(variableExpressionPair.second);
+ } else {
+ stringBuilder.append(variableExpressionPair.second.getValue().toString());
}
- stringBuilder.append(exprRef.getValue().toString());
}
stringBuilder.append("]");
}
- private void pprintVeList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) {
- stringBuilder.append("[");
- boolean fst = true;
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) {
- if (fst) {
- fst = false;
- } else {
- stringBuilder.append("; ");
+ private void appendSchema(AbstractLogicalOperator op, boolean show) {
+ if (show) {
+ stringBuilder.append("\\nSchema: ");
+ final List<LogicalVariable> schema = op.getSchema();
+ stringBuilder.append(schema == null ? "null" : schema);
+ }
+ }
+
+ private void appendAnnotations(AbstractLogicalOperator op, boolean show) {
+ if (show) {
+ final Map<String, Object> annotations = op.getAnnotations();
+ if (!annotations.isEmpty()) {
+ stringBuilder.append("\\nAnnotations: ").append(annotations);
}
- if (ve.first != null) {
- stringBuilder.append(ve.first).append(" := ").append(ve.second);
- } else {
- stringBuilder.append(ve.second.getValue().toString());
+ }
+ }
+
+ private void appendPhysicalOperatorInfo(AbstractLogicalOperator op, boolean show) {
+ IPhysicalOperator physicalOp = op.getPhysicalOperator();
+ stringBuilder.append("\\n").append(physicalOp == null ? "null" : physicalOp.toString().trim());
+ stringBuilder.append(", Exec: ").append(op.getExecutionMode());
+ if (show) {
+ IPhysicalPropertiesVector properties = physicalOp == null ? null : physicalOp.getDeliveredProperties();
+ List<ILocalStructuralProperty> localProp = properties == null ? null : properties.getLocalProperties();
+ IPartitioningProperty partitioningProp = properties == null ? null : properties.getPartitioningProperty();
+ if (localProp != null) {
+ stringBuilder.append("\\nProperties in each partition: [");
+ for (ILocalStructuralProperty property : localProp) {
+ if (property == null) {
+ stringBuilder.append("null, ");
+ } else if (property.getPropertyType() == LOCAL_ORDER_PROPERTY) {
+ stringBuilder.append("ordered by ");
+ } else if (property.getPropertyType() == LOCAL_GROUPING_PROPERTY) {
+ stringBuilder.append("group by ");
+ }
+ stringBuilder.append(property).append(", ");
+ }
+ stringBuilder.append("]");
+ }
+
+ if (partitioningProp != null) {
+ stringBuilder.append("\\n").append(partitioningProp.getPartitioningType()).append(":");
+ INodeDomain nodeDomain = partitioningProp.getNodeDomain();
+ stringBuilder.append("\\n ");
+ if (nodeDomain != null && nodeDomain.cardinality() != null) {
+ stringBuilder.append(nodeDomain.cardinality()).append(" partitions. ");
+ }
+ switch (partitioningProp.getPartitioningType()) {
+ case BROADCAST:
+ stringBuilder.append("Data is broadcast to partitions.");
+ break;
+ case RANDOM:
+ stringBuilder.append("Data is randomly partitioned.");
+ break;
+ case ORDERED_PARTITIONED:
+ stringBuilder.append("Data is orderly partitioned via a range.");
+ break;
+ case UNORDERED_PARTITIONED:
+ stringBuilder.append("Data is hash partitioned.");
+ break;
+ case UNPARTITIONED:
+ stringBuilder.append("Data is in one place.");
+ }
+ if (nodeDomain instanceof DefaultNodeGroupDomain) {
+ DefaultNodeGroupDomain nd = (DefaultNodeGroupDomain) nodeDomain;
+ stringBuilder.append("\\n").append(nd);
+ }
}
}
- stringBuilder.append("]");
+
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 6f7f86a..cdab2f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -21,9 +21,11 @@ package org.apache.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -40,28 +42,41 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -90,13 +105,20 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
private static final String HASH_MERGE = "hash_merge";
private static final String TRUE_CONSTANT = "true";
private PhysicalOptimizationConfig physicalOptimizationConfig;
+ private final FunctionIdentifier rangeMapFunction;
+ private final FunctionIdentifier localSamplingFun;
+
+ public EnforceStructuralPropertiesRule(FunctionIdentifier rangeMapFunction, FunctionIdentifier localSamplingFun) {
+ this.rangeMapFunction = rangeMapFunction;
+ this.localSamplingFun = localSamplingFun;
+ }
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -204,6 +226,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
boolean changed = false;
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
optimizeUsingConstraintsAndEquivClasses(op);
PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
IPhysicalPropertiesVector[] reqdProperties = null;
@@ -214,26 +237,19 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
// compute properties and figure out the domain
INodeDomain childrenDomain = null;
- {
- int j = 0;
- for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
- AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
- // recursive call
- if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) {
- changed = true;
- }
- child.computeDeliveredPhysicalProperties(context);
- IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
- if (childrenDomain == null) {
- childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
- } else {
- INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
- if (!childrenDomain.sameAs(dom2)) {
- childrenDomain = context.getComputationNodeDomain();
- }
- }
- j++;
+ int j = 0;
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+ changed |= physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context);
+ child.computeDeliveredPhysicalProperties(context);
+ IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
+ INodeDomain childDomain = delivered.getPartitioningProperty().getNodeDomain();
+ if (childrenDomain == null) {
+ childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
+ } else if (!childrenDomain.sameAs(childDomain)) {
+ childrenDomain = context.getComputationNodeDomain();
}
+ j++;
}
if (reqdProperties != null) {
@@ -252,7 +268,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
IPartitioningProperty firstDeliveredPartitioning = null;
// Enforce data properties in a top-down manner.
- for (int j = 0; j < op.getInputs().size(); j++) {
+ for (j = 0; j < op.getInputs().size(); j++) {
// Starts from a partitioning-compatible child if any to loop over all children.
int childIndex = (j + startChildIndex) % op.getInputs().size();
IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
@@ -555,6 +571,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
return new MutableObject<ILogicalOperator>(oo);
}
+ /**
+ * Adds exchange operators (connectors) between {@code op} & its child at index {@code childIdx}.
+ * @param op the parent operator that is requiring a specific kind of connector at its child
+ * @param i the child index where we want to have the connector
+ * @param pp the required partitioning property at that child (i.e. the required connector)
+ * @param required the physical properties required at that child (partitioning + local properties)
+ * @param deliveredByChild the physical properties delivered by that child (partitioning + local properties)
+ * @param domain the destination domain of nodes that we want the connector to connect to
+ * @param context {@link IOptimizationContext}
+ * @throws AlgebricksException
+ */
private void addPartitioningEnforcers(ILogicalOperator op, int i, IPartitioningProperty pp,
IPhysicalPropertiesVector required, IPhysicalPropertiesVector deliveredByChild, INodeDomain domain,
IOptimizationContext context) throws AlgebricksException {
@@ -562,52 +589,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
IPhysicalOperator pop;
switch (pp.getPartitioningType()) {
case UNPARTITIONED: {
- List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
- if (ordCols.isEmpty()) {
- pop = new RandomMergeExchangePOperator();
- } else {
- if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
- IRangeMap rangeMap =
- (IRangeMap) op.getAnnotations().get(OperatorAnnotations.USE_RANGE_CONNECTOR);
- pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
- } else {
- OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
- sortColumns = ordCols.toArray(sortColumns);
- pop = new SortMergeExchangePOperator(sortColumns);
- }
- }
+ pop = createMergingConnector(op, domain, deliveredByChild);
break;
}
case UNORDERED_PARTITIONED: {
- List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
- String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE);
- if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
- pop = new HashPartitionExchangePOperator(varList, domain);
- break;
- }
- List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
- List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
- boolean propWasSet = false;
- pop = null;
- if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
- AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
- Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c);
- List<FunctionalDependency> fds = context.getFDList(c);
- if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
- List<OrderColumn> orderColumns =
- getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
- pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain);
- propWasSet = true;
- }
- }
- if (!propWasSet) {
- pop = new HashPartitionExchangePOperator(varList, domain);
- }
+ pop = createHashConnector(context, deliveredByChild, domain, required, pp, i, op);
break;
}
case ORDERED_PARTITIONED: {
- pop = new RangePartitionExchangePOperator(((OrderedPartitionedProperty) pp).getOrderColumns(),
- domain, null);
+ pop = createRangePartitionerConnector((AbstractLogicalOperator) op, domain, pp, i, context);
break;
}
case BROADCAST: {
@@ -640,6 +630,264 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
}
}
+ private IPhysicalOperator createMergingConnector(ILogicalOperator parentOp, INodeDomain domain,
+ IPhysicalPropertiesVector deliveredByChild) {
+ IPhysicalOperator mergingConnector;
+ List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
+ if (ordCols.isEmpty()) {
+ IPartitioningProperty partitioningDeliveredByChild = deliveredByChild.getPartitioningProperty();
+ if (partitioningDeliveredByChild.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) {
+ mergingConnector = new SequentialMergeExchangePOperator();
+ } else {
+ mergingConnector = new RandomMergeExchangePOperator();
+ }
+ } else {
+ if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
+ RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
+ mergingConnector = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
+ } else {
+ OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
+ sortColumns = ordCols.toArray(sortColumns);
+ mergingConnector = new SortMergeExchangePOperator(sortColumns);
+ }
+ }
+ return mergingConnector;
+ }
+
+ private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
+ INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning,
+ int childIndex, ILogicalOperator parentOp) {
+ IPhysicalOperator hashConnector;
+ List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet());
+ String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE);
+ if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
+ hashConnector = new HashPartitionExchangePOperator(vars, domain);
+ return hashConnector;
+ }
+ List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
+ List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties();
+ boolean propWasSet = false;
+ hashConnector = null;
+ if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
+ AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue();
+ Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c);
+ List<FunctionalDependency> fds = ctx.getFDList(c);
+ if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
+ List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
+ hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain);
+ propWasSet = true;
+ }
+ }
+ if (!propWasSet) {
+ hashConnector = new HashPartitionExchangePOperator(vars, domain);
+ }
+ return hashConnector;
+ }
+
+ /**
+ * Creates a range-based exchange operator.
+ * @param parentOp the operator requiring range-based partitioner to have input tuples repartitioned using a range
+ * @param domain the target node domain of the range-based partitioner
+ * @param requiredPartitioning {@see OrderedPartitionedProperty}
+ * @param childIndex the index of the child at which the required partitioning is needed
+ * @param ctx optimization context
+ * @return a range-based exchange operator
+ * @throws AlgebricksException
+ */
+ private IPhysicalOperator createRangePartitionerConnector(AbstractLogicalOperator parentOp, INodeDomain domain,
+ IPartitioningProperty requiredPartitioning, int childIndex, IOptimizationContext ctx)
+ throws AlgebricksException {
+ // options for range partitioning: 1. static range map, 2. dynamic range map computed at run time
+ List<OrderColumn> partitioningColumns = ((OrderedPartitionedProperty) requiredPartitioning).getOrderColumns();
+ if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
+ // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here.
+ RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
+ return new RangePartitionExchangePOperator(partitioningColumns, domain, rangeMap);
+ } else {
+ return createDynamicRangePartitionExchangePOperator(parentOp, ctx, domain, partitioningColumns, childIndex);
+ }
+ }
+
+ private IPhysicalOperator createDynamicRangePartitionExchangePOperator(AbstractLogicalOperator parentOp,
+ IOptimizationContext ctx, INodeDomain targetDomain, List<OrderColumn> partitioningColumns, int childIndex)
+ throws AlgebricksException {
+ SourceLocation sourceLoc = parentOp.getSourceLocation();
+ // #1. create the replicate operator and add it above the source op feeding parent operator
+ ReplicateOperator replicateOp = createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, sourceLoc);
+
+ // these two exchange ops are needed so that the parents of replicate stay the same during later optimizations.
+ // This is because replicate operator has references to its parents. If any later optimizations add new parents,
+ // then replicate would still point to the old ones.
+ MutableObject<ILogicalOperator> replicateOpRef = new MutableObject<>(replicateOp);
+ ExchangeOperator exchToLocalAgg = createOneToOneExchangeOp(replicateOpRef, ctx);
+ ExchangeOperator exchToForward = createOneToOneExchangeOp(replicateOpRef, ctx);
+ MutableObject<ILogicalOperator> exchToLocalAggRef = new MutableObject<>(exchToLocalAgg);
+ MutableObject<ILogicalOperator> exchToForwardRef = new MutableObject<>(exchToForward);
+
+ // add the exchange--to-forward at output 0, the exchange-to-local-aggregate at output 1
+ replicateOp.getOutputs().add(exchToForwardRef);
+ replicateOp.getOutputs().add(exchToLocalAggRef);
+ // materialize the data to be able to re-read the data again after sampling is done
+ replicateOp.getOutputMaterializationFlags()[0] = true;
+
+ // #2. create the aggregate operators and their sampling functions
+ // $$samplingResultVar = local_samplingFun($$partitioning_column)
+ // $$rangeMapResultVar = global_rangeMapFun($$samplingResultVar)
+ List<LogicalVariable> samplingResultVar = new ArrayList<>(1);
+ List<LogicalVariable> rangeMapResultVar = new ArrayList<>(1);
+ List<Mutable<ILogicalExpression>> samplingFun = new ArrayList<>(1);
+ List<Mutable<ILogicalExpression>> rangeMapFun = new ArrayList<>(1);
+
+ createAggregateFunction(ctx, samplingResultVar, samplingFun, rangeMapResultVar, rangeMapFun,
+ targetDomain.cardinality(), partitioningColumns, sourceLoc);
+
+ AggregateOperator localAggOp =
+ createAggregate(samplingResultVar, false, samplingFun, exchToLocalAggRef, ctx, sourceLoc);
+ MutableObject<ILogicalOperator> localAgg = new MutableObject<>(localAggOp);
+ AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, true, rangeMapFun, localAgg, ctx, sourceLoc);
+ MutableObject<ILogicalOperator> globalAgg = new MutableObject<>(globalAggOp);
+
+ // #3. create the forward operator
+ String rangeMapKey = UUID.randomUUID().toString();
+ LogicalVariable rangeMapVar = rangeMapResultVar.get(0);
+ ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, exchToForwardRef, globalAgg, ctx, sourceLoc);
+ MutableObject<ILogicalOperator> forwardRef = new MutableObject<>(forward);
+
+ // replace the old input of parentOp requiring the range partitioning with the new forward op
+ parentOp.getInputs().set(childIndex, forwardRef);
+ parentOp.recomputeSchema();
+ ctx.computeAndSetTypeEnvironmentForOperator(parentOp);
+
+ return new RangePartitionExchangePOperator(partitioningColumns, rangeMapKey, targetDomain);
+ }
+
+ private static ReplicateOperator createReplicateOperator(Mutable<ILogicalOperator> inputOperator,
+ IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+ ReplicateOperator replicateOperator = new ReplicateOperator(2);
+ replicateOperator.setPhysicalOperator(new ReplicatePOperator());
+ replicateOperator.setSourceLocation(sourceLocation);
+ replicateOperator.getInputs().add(inputOperator);
+ OperatorManipulationUtil.setOperatorMode(replicateOperator);
+ replicateOperator.recomputeSchema();
+ context.computeAndSetTypeEnvironmentForOperator(replicateOperator);
+ return replicateOperator;
+ }
+
+ /**
+ * Creates the sampling expressions and embeds them in {@code localAggFunctions} & {@code globalAggFunctions}. Also,
+ * creates the variables which will hold the result of each one.
+ * {@code localResultVariables},{@code localAggFunctions},{@code globalResultVariables} & {@code globalAggFunctions}
+ * will be used when creating the corresponding aggregate operators.
+ * @param context used to get new variables which will be assigned the samples & the range map
+ * @param localResultVariables the variable to which the stats (e.g. samples) info is assigned
+ * @param localAggFunctions the local sampling expression is added to this list
+ * @param globalResultVariables the variable to which the range map is assigned
+ * @param globalAggFunctions the expression generating a range map is added to this list
+ * @param numPartitions passed to the expression generating a range map to know how many split points are needed
+ * @param partFields the fields based on which the partitioner partitions the tuples, also sampled fields
+ * @param sourceLocation source location
+ */
+ private void createAggregateFunction(IOptimizationContext context, List<LogicalVariable> localResultVariables,
+ List<Mutable<ILogicalExpression>> localAggFunctions, List<LogicalVariable> globalResultVariables,
+ List<Mutable<ILogicalExpression>> globalAggFunctions, int numPartitions, List<OrderColumn> partFields,
+ SourceLocation sourceLocation) {
+ // prepare the arguments of the local sampling function: sampled fields
+ List<Mutable<ILogicalExpression>> sampledFields = new ArrayList<>(partFields.size());
+ partFields.forEach(f -> {
+ AbstractLogicalExpression sampledField = new VariableReferenceExpression(f.getColumn());
+ sampledField.setSourceLocation(sourceLocation);
+ sampledFields.add(new MutableObject<>(sampledField));
+ });
+
+ // local info
+ IFunctionInfo samplingFun = context.getMetadataProvider().lookupFunction(localSamplingFun);
+ AbstractFunctionCallExpression samplingExp =
+ new AggregateFunctionCallExpression(samplingFun, false, sampledFields);
+ samplingExp.setSourceLocation(sourceLocation);
+ LogicalVariable samplingResultVar = context.newVar();
+ localResultVariables.add(samplingResultVar);
+ localAggFunctions.add(new MutableObject<>(samplingExp));
+ Object[] samplingParam = { context.getPhysicalOptimizationConfig().getSortSamples() };
+ samplingExp.setOpaqueParameters(samplingParam);
+
+ // prepare the argument of the global range map generator function: the result of the local function
+ List<Mutable<ILogicalExpression>> arg = new ArrayList<>(1);
+ AbstractLogicalExpression samplingResultVarExp = new VariableReferenceExpression(samplingResultVar);
+ samplingResultVarExp.setSourceLocation(sourceLocation);
+ arg.add(new MutableObject<>(samplingResultVarExp));
+
+ // global info
+ IFunctionInfo rangeMapFun = context.getMetadataProvider().lookupFunction(rangeMapFunction);
+ AbstractFunctionCallExpression rangeMapExp = new AggregateFunctionCallExpression(rangeMapFun, true, arg);
+ rangeMapExp.setSourceLocation(sourceLocation);
+ globalResultVariables.add(context.newVar());
+ globalAggFunctions.add(new MutableObject<>(rangeMapExp));
+
+ int i = 0;
+ boolean[] ascendingFlags = new boolean[partFields.size()];
+ for (OrderColumn column : partFields) {
+ ascendingFlags[i] = column.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
+ i++;
+ }
+ rangeMapExp.setOpaqueParameters(new Object[] { numPartitions, ascendingFlags });
+ }
+
+ /**
+ * Creates an aggregate operator. $$resultVariables = expressions()
+ * @param resultVariables the variables which stores the result of the aggregation
+ * @param isGlobal whether the aggregate operator is a global or local one
+ * @param expressions the aggregation functions desired
+ * @param inputOperator the input op that is feeding the aggregate operator
+ * @param context optimization context
+ * @param sourceLocation source location
+ * @return an aggregate operator with the specified information
+ * @throws AlgebricksException when there is error setting the type environment of the newly created aggregate op
+ */
+ private static AggregateOperator createAggregate(List<LogicalVariable> resultVariables, boolean isGlobal,
+ List<Mutable<ILogicalExpression>> expressions, MutableObject<ILogicalOperator> inputOperator,
+ IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+ AggregateOperator aggregateOperator = new AggregateOperator(resultVariables, expressions);
+ aggregateOperator.setPhysicalOperator(new AggregatePOperator());
+ aggregateOperator.setSourceLocation(sourceLocation);
+ aggregateOperator.getInputs().add(inputOperator);
+ aggregateOperator.setGlobal(isGlobal);
+ if (!isGlobal) {
+ aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
+ } else {
+ aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ }
+ aggregateOperator.recomputeSchema();
+ context.computeAndSetTypeEnvironmentForOperator(aggregateOperator);
+ return aggregateOperator;
+ }
+
+ private static ExchangeOperator createOneToOneExchangeOp(MutableObject<ILogicalOperator> inputOperator,
+ IOptimizationContext context) throws AlgebricksException {
+ ExchangeOperator exchangeOperator = new ExchangeOperator();
+ exchangeOperator.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchangeOperator.getInputs().add(inputOperator);
+ exchangeOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ exchangeOperator.recomputeSchema();
+ context.computeAndSetTypeEnvironmentForOperator(exchangeOperator);
+ return exchangeOperator;
+ }
+
+ private static ForwardOperator createForward(String rangeMapKey, LogicalVariable rangeMapVariable,
+ MutableObject<ILogicalOperator> exchangeOpFromReplicate, MutableObject<ILogicalOperator> globalAggInput,
+ IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
+ AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable);
+ rangeMapExpression.setSourceLocation(sourceLocation);
+ ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression));
+ forwardOperator.setSourceLocation(sourceLocation);
+ forwardOperator.setPhysicalOperator(new ForwardPOperator());
+ forwardOperator.getInputs().add(exchangeOpFromReplicate);
+ forwardOperator.getInputs().add(globalAggInput);
+ OperatorManipulationUtil.setOperatorMode(forwardOperator);
+ forwardOperator.recomputeSchema();
+ context.computeAndSetTypeEnvironmentForOperator(forwardOperator);
+ return forwardOperator;
+ }
+
private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
for (ILocalStructuralProperty lsp : cldLocals) {
if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {