You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:34 UTC
[04/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR]
Parallel Sort Optimization
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
new file mode 100644
index 0000000..db11712
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically
+ * by doing a pass over the data itself to infer the range map. The operator takes two inputs:
+ * 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC.
+ * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up.
+ * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which
+ * the forward operator will start forwarding the data.
+ */
+public class ForwardOperator extends AbstractLogicalOperator {
+
+ private final String rangeMapKey;
+ private final Mutable<ILogicalExpression> rangeMapExpression;
+
+ public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) {
+ super();
+ this.rangeMapKey = rangeMapKey;
+ this.rangeMapExpression = rangeMapExpression;
+ }
+
+ public String getRangeMapKey() {
+ return rangeMapKey;
+ }
+
+ public Mutable<ILogicalExpression> getRangeMapExpression() {
+ return rangeMapExpression;
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.FORWARD;
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ // schema is equal to the schema of the data source at idx 0
+ setSchema(inputs.get(0).getValue().getSchema());
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ return visitor.transform(rangeMapExpression);
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitForwardOperator(this, arg);
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ // propagate the variables of the data source at idx 0
+ if (sources.length > 0) {
+ target.addAllVariables(sources[0]);
+ }
+ }
+ };
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ // propagate the type environment of the data source at idx 0
+ ITypeEnvPointer[] envPointers = new ITypeEnvPointer[] { new OpRefTypeEnvPointer(inputs.get(0), ctx) };
+ return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMissableTypeComputer(),
+ ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index d0aea60..9d853eb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -31,11 +31,12 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -276,6 +277,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
}
@Override
+ public Long visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ return op.getInputs().get(0).getValue().accept(this, arg);
+ }
+
+ @Override
public Long visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
long cardinality = UNKNOWN;
for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index d0d121f..16fc1ed 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -53,6 +53,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -560,6 +561,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
setEmptyFDsEqClasses(op, ctx);
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 90c0067..2b5e569 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -595,6 +596,18 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
}
@Override
+ public Boolean visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+ AbstractLogicalOperator argOperator = (AbstractLogicalOperator) arg;
+ if (argOperator.getOperatorTag() != LogicalOperatorTag.FORWARD) {
+ return Boolean.FALSE;
+ }
+ ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg);
+ ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue();
+ ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue();
+ return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey());
+ }
+
+ @Override
public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
return true;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 2caa252..742d485 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -284,6 +285,12 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index e0210cc..0196db6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -587,6 +588,14 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
}
@Override
+ public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+ ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(),
+ exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression()));
+ deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+ return opCopy;
+ }
+
+ @Override
public ILogicalOperator visitDelegateOperator(DelegateOperator op, ILogicalOperator arg)
throws AlgebricksException {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 6dfe254..7d3d676 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -30,11 +30,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractAssi
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -289,6 +290,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 0db0f74..c6f0c14 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -342,6 +343,11 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
}
@Override
+ public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression()));
+ }
+
+ @Override
public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
return new SinkOperator();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index c96276f..f36f604 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -32,11 +32,12 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -278,6 +279,11 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, IOptimizationContext arg)
throws AlgebricksException {
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index ec96d48..5d0ef6a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -36,11 +36,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnne
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -278,6 +279,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
return null;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 59ccd84..70ccf6d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -318,6 +319,13 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ // only consider variables from the branch of the data source
+ VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), schemaVariables);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 3587e29..c62f555 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -460,6 +461,14 @@ public class SubstituteVariableVisitor
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second);
+ substVarTypes(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e66809e..2c68697 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -138,50 +139,43 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
switch (physOp.getOperatorTag()) {
case BROADCAST_EXCHANGE:
case ONE_TO_ONE_EXCHANGE:
- case RANDOM_MERGE_EXCHANGE: {
+ case RANDOM_MERGE_EXCHANGE:
+ case SEQUENTIAL_MERGE_EXCHANGE:
// No variables used.
break;
- }
- case HASH_PARTITION_EXCHANGE: {
- HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;
- usedVariables.addAll(concreteOp.getHashFields());
+ case HASH_PARTITION_EXCHANGE:
+ HashPartitionExchangePOperator hashPartitionPOp = (HashPartitionExchangePOperator) physOp;
+ usedVariables.addAll(hashPartitionPOp.getHashFields());
break;
- }
- case HASH_PARTITION_MERGE_EXCHANGE: {
- HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;
- usedVariables.addAll(concreteOp.getPartitionFields());
- for (OrderColumn orderCol : concreteOp.getOrderColumns()) {
+ case HASH_PARTITION_MERGE_EXCHANGE:
+ HashPartitionMergeExchangePOperator hashMergePOp = (HashPartitionMergeExchangePOperator) physOp;
+ usedVariables.addAll(hashMergePOp.getPartitionFields());
+ for (OrderColumn orderCol : hashMergePOp.getOrderColumns()) {
usedVariables.add(orderCol.getColumn());
}
break;
- }
- case SORT_MERGE_EXCHANGE: {
- SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;
- for (OrderColumn orderCol : concreteOp.getSortColumns()) {
+ case SORT_MERGE_EXCHANGE:
+ SortMergeExchangePOperator sortMergePOp = (SortMergeExchangePOperator) physOp;
+ for (OrderColumn orderCol : sortMergePOp.getSortColumns()) {
usedVariables.add(orderCol.getColumn());
}
break;
- }
- case RANGE_PARTITION_EXCHANGE: {
- RangePartitionExchangePOperator concreteOp = (RangePartitionExchangePOperator) physOp;
- for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ case RANGE_PARTITION_EXCHANGE:
+ RangePartitionExchangePOperator rangePartitionPOp = (RangePartitionExchangePOperator) physOp;
+ for (OrderColumn partCol : rangePartitionPOp.getPartitioningFields()) {
usedVariables.add(partCol.getColumn());
}
break;
- }
- case RANGE_PARTITION_MERGE_EXCHANGE: {
- RangePartitionMergeExchangePOperator concreteOp = (RangePartitionMergeExchangePOperator) physOp;
- for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ case RANGE_PARTITION_MERGE_EXCHANGE:
+ RangePartitionMergeExchangePOperator rangeMergePOp = (RangePartitionMergeExchangePOperator) physOp;
+ for (OrderColumn partCol : rangeMergePOp.getPartitioningFields()) {
usedVariables.add(partCol.getColumn());
}
break;
- }
- case RANDOM_PARTITION_EXCHANGE: {
+ case RANDOM_PARTITION_EXCHANGE:
break;
- }
- default: {
+ default:
throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
- }
}
}
return null;
@@ -439,6 +433,12 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ op.getRangeMapExpression().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) {
return null;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 78e96a4..0c08369 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -29,16 +30,20 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -67,16 +72,26 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat
}
@Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
- if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator sortOp,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
+ if (sortOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
if (orderProp == null) {
- computeLocalProperties(op);
+ computeLocalProperties(sortOp);
}
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(
- IPartitioningProperty.UNPARTITIONED, Collections.singletonList(orderProp)) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ StructuralPropertiesVector[] requiredProp = new StructuralPropertiesVector[1];
+ IPartitioningProperty partitioning;
+ INodeDomain targetNodeDomain = ctx.getComputationNodeDomain();
+ if (isFullParallel((AbstractLogicalOperator) sortOp, targetNodeDomain, ctx)) {
+ // partitioning requirement: input data is re-partitioned on sort columns (global ordering)
+ // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here
+ partitioning = new OrderedPartitionedProperty(Arrays.asList(sortColumns), targetNodeDomain);
+ } else {
+ // partitioning requirement: input data is unpartitioned (i.e. must be merged at one site)
+ partitioning = IPartitioningProperty.UNPARTITIONED;
+ }
+ // local requirement: each partition must be locally ordered
+ requiredProp[0] = new StructuralPropertiesVector(partitioning, Collections.singletonList(orderProp));
+ return new PhysicalRequirements(requiredProp, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
}
@@ -123,4 +138,27 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat
public boolean expensiveThanMaterialization() {
return true;
}
+
+ /**
+ * When true, the sort operator requires ORDERED_PARTITION (only applicable to dynamic version for now).
+ * Conditions:
+ * 1. Execution mode == partitioned
+ * 2. Dynamic range map was not disabled by some checks
+ * 3. User didn't disable it
+ * 4. User didn't provide static range map
+ * 5. Physical sort operator is not in-memory
+ * 6. There are at least two partitions in the cluster
+ * @param sortOp the sort operator
+ * @param clusterDomain the partitions specification of the cluster
+ * @param ctx optimization context
+ * @return true if the sort operator should be full parallel sort, false otherwise.
+ */
+ private boolean isFullParallel(AbstractLogicalOperator sortOp, INodeDomain clusterDomain,
+ IOptimizationContext ctx) {
+ return sortOp.getAnnotations().get(OperatorAnnotations.USE_DYNAMIC_RANGE) != Boolean.FALSE
+ && !sortOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)
+ && sortOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT
+ && clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1
+ && ctx.getPhysicalOptimizationConfig().getSortParallel();
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
new file mode 100644
index 0000000..11c584e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+
+/**
+ * <pre>
+ * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
+ * idx0: Input data source --
+ * |-- forward op.
+ * idx1: RangeMap generator--
+ * </pre>
+ */
+public class ForwardPOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.FORWARD;
+ }
+
+ /**
+ * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at
+ * the data source input.
+ * @param op {@see {@link ForwardOperator}}
+ * @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan
+ * @param context the optimization context
+ * @return broadcast requirement at input 1; empty requirements at input 0; No coordination between the two.
+ */
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
+ // broadcast the range map to the cluster node domain
+ INodeDomain targetDomain = context.getComputationNodeDomain();
+ List<ILocalStructuralProperty> noProp = new ArrayList<>();
+ StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2];
+ requiredAtInputs[0] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+ requiredAtInputs[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(targetDomain), noProp);
+ return new PhysicalRequirements(requiredAtInputs, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ /**
+ * Forward operator delivers whatever properties delivered by the input located at index = 0 (tuples source op).
+ * Subtree at index 0 must compute its delivered properties before any call to this method
+ * @param op forward logical operator
+ * @param context {@link IOptimizationContext}
+ * @throws AlgebricksException
+ */
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator dataSourceOperator = op.getInputs().get(0).getValue();
+ deliveredProperties = dataSourceOperator.getDeliveredPhysicalProperties().clone();
+ }
+
+ /**
+ * The output record descriptor of forward operator is the same as the output record descriptor of the data source
+ * which is located at index 0.
+ * @param builder Hyracks job builder
+ * @param context job generation context
+ * @param op {@see {@link ForwardOperator}}
+ * @param propagatedSchema not used
+ * @param inputSchemas schemas of all inputs
+ * @param outerPlanSchema not used
+ * @throws AlgebricksException
+ */
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ ForwardOperator forwardOp = (ForwardOperator) op;
+ RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context);
+ ForwardOperatorDescriptor forwardDescriptor =
+ new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor);
+ builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
+ ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
+ ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1);
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] outputDependencyLabels = new int[] { 1 };
+ int[] inputDependencyLabels = new int[] { 1, 0 };
+ return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 6630d32..aeb9ac7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -42,27 +42,40 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
private List<OrderColumn> partitioningFields;
private INodeDomain domain;
- private IRangeMap rangeMap;
+ private RangeMap rangeMap;
+ private final boolean rangeMapIsComputedAtRunTime;
+ private final String rangeMapKeyInContext;
- public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
- IRangeMap rangeMap) {
+ private RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeMap rangeMap,
+ boolean rangeMapIsComputedAtRunTime, String rangeMapKeyInContext) {
this.partitioningFields = partitioningFields;
this.domain = domain;
this.rangeMap = rangeMap;
+ this.rangeMapIsComputedAtRunTime = rangeMapIsComputedAtRunTime;
+ this.rangeMapKeyInContext = rangeMapKeyInContext;
+ }
+
+ public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, String rangeMapKeyInContext,
+ INodeDomain domain) {
+ this(partitioningFields, domain, null, true, rangeMapKeyInContext);
+ }
+
+ public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
+ RangeMap rangeMap) {
+ this(partitioningFields, domain, rangeMap, false, "");
}
@Override
@@ -80,8 +93,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p =
- new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
+ IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<>(partitioningFields), domain);
this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
}
@@ -97,32 +109,31 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
int n = partitioningFields.size();
int[] sortFields = new int[n];
IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-
- INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
- INormalizedKeyComputerFactory nkcf = null;
-
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
int i = 0;
for (OrderColumn oc : partitioningFields) {
LogicalVariable var = oc.getColumn();
sortFields[i] = opSchema.findVariable(var);
Object type = env.getVarType(var);
- OrderKind order = oc.getOrder();
- if (i == 0 && nkcfProvider != null && type != null) {
- nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
- }
IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
i++;
}
- ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
- IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ FieldRangePartitionComputerFactory partitionerFactory;
+ if (rangeMapIsComputedAtRunTime) {
+ partitionerFactory = new DynamicFieldRangePartitionComputerFactory(sortFields, comps, rangeMapKeyInContext,
+ op.getSourceLocation());
+ } else {
+ partitionerFactory = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ }
+
+ IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
+ return new Pair<>(conn, null);
}
@Override
public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ final String splitCount = rangeMap == null ? "" : " SPLIT COUNT:" + Integer.toString(rangeMap.getSplitCount());
+ return getOperatorTag().toString() + " " + partitioningFields + splitCount;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index ec32a53..b015193 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -53,18 +53,18 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
private List<OrderColumn> partitioningFields;
private INodeDomain domain;
- private IRangeMap rangeMap;
+ private RangeMap rangeMap;
public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
- IRangeMap rangeMap) {
+ RangeMap rangeMap) {
this.partitioningFields = partitioningFields;
this.domain = domain;
this.rangeMap = rangeMap;
@@ -143,7 +143,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
i++;
}
- ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ ITuplePartitionComputerFactory tpcf = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
new file mode 100644
index 0000000..df0b446
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
+
+public class SequentialMergeExchangePOperator extends AbstractExchangePOperator {
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SEQUENTIAL_MERGE_EXCHANGE;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties();
+ List<ILocalStructuralProperty> localProperties;
+ if (childLocalProps != null) {
+ localProperties = new ArrayList<>(childLocalProps);
+ } else {
+ localProperties = new ArrayList<>(0);
+ }
+
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties);
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor(
+ IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
+ throws AlgebricksException {
+ IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec);
+ return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 99ed738..77f052e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -465,6 +466,13 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
+ addIndent(indent)
+ .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent));
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("sink");
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index f1f1f3b..4a17cc6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -647,6 +648,14 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
}
@Override
+ public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
+ addIndent(indent).append("\"operator\": \"forward\"");
+ addIndent(indent).append("\"expressions\": \""
+ + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("\"operator\": \"sink\"");
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
index e7e98a5..b006a1e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
@@ -121,20 +121,7 @@ public final class LocalOrderProperty implements ILocalStructuralProperty {
Iterator<OrderColumn> currentColumnIterator = orderColumns.iterator();
// Returns true if requiredColumnIterator is a prefix of currentColumnIterator.
- return isPrefixOf(requiredColumnIterator, currentColumnIterator);
- }
-
- private <T> boolean isPrefixOf(Iterator<T> requiredColumnIterator, Iterator<T> currentColumnIterator) {
- while (requiredColumnIterator.hasNext()) {
- T oc = requiredColumnIterator.next();
- if (!currentColumnIterator.hasNext()) {
- return false;
- }
- if (!oc.equals(currentColumnIterator.next())) {
- return false;
- }
- }
- return true;
+ return PropertiesUtil.isPrefixOf(requiredColumnIterator, currentColumnIterator);
}
// Gets normalized ordering columns, where each column variable is a representative variable of its equivalence
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index f2fed13..1c00e45 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -200,7 +200,7 @@ public class PropertiesUtil {
* @param target
* @return true iff pref is a prefix of target
*/
- private static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) {
+ public static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) {
while (pref.hasNext()) {
T v = pref.next();
if (!target.hasNext()) {
@@ -213,50 +213,65 @@ public class PropertiesUtil {
return true;
}
+ /**
+ * Normalizes or reduces the order columns argument based on the functional dependencies argument. The caller is
+ * responsible for taking caution as to how to handle the returned object since this method either returns the same
+ * object that is passed or returns a new object.
+ * @param orderColumns the order columns that are to be normalized
+ * @param functionalDependencies {@link FunctionalDependency}
+ * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned.
+ */
public static List<OrderColumn> applyFDsToOrderColumns(List<OrderColumn> orderColumns,
- List<FunctionalDependency> fds) {
- // the set of vars. is ordered
- // so we try the variables in order from last to first
- if (fds == null || fds.isEmpty()) {
+ List<FunctionalDependency> functionalDependencies) {
+ if (functionalDependencies == null || functionalDependencies.isEmpty()) {
return orderColumns;
}
+ // the set of vars. is ordered
+ // so we try the variables in order from last to first
int deleted = 0;
+ boolean[] removedColumns = new boolean[orderColumns.size()];
for (int i = orderColumns.size() - 1; i >= 0; i--) {
- for (FunctionalDependency fdep : fds) {
- if (impliedByPrefix(orderColumns, i, fdep)) {
- orderColumns.set(i, null);
+ for (FunctionalDependency functionalDependency : functionalDependencies) {
+ if (impliedByPrefix(orderColumns, i, functionalDependency)) {
+ removedColumns[i] = true;
deleted++;
break;
}
}
}
- List<OrderColumn> norm = new ArrayList<>(orderColumns.size() - deleted);
- for (OrderColumn oc : orderColumns) {
- if (oc != null) {
- norm.add(oc);
+ List<OrderColumn> normalizedColumns = new ArrayList<>(orderColumns.size() - deleted);
+ for (int i = 0; i < orderColumns.size(); i++) {
+ if (!removedColumns[i]) {
+ normalizedColumns.add(orderColumns.get(i));
}
}
- return norm;
+
+ return normalizedColumns;
}
+ /**
+ * Normalizes or reduces the order columns argument based on the equivalenceClasses argument. The caller is
+ * responsible for taking caution as to how to handle the returned object since this method either returns the same
+ * object that is passed or returns a new object.
+ * @param orderColumns the order columns that are to be normalized
+ * @param equivalenceClasses {@link EquivalenceClass}
+ * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned.
+ */
public static List<OrderColumn> replaceOrderColumnsByEqClasses(List<OrderColumn> orderColumns,
Map<LogicalVariable, EquivalenceClass> equivalenceClasses) {
if (equivalenceClasses == null || equivalenceClasses.isEmpty()) {
return orderColumns;
}
List<OrderColumn> norm = new ArrayList<>();
- for (OrderColumn v : orderColumns) {
- EquivalenceClass ec = equivalenceClasses.get(v.getColumn());
- if (ec == null) {
- norm.add(v);
- } else {
- if (ec.representativeIsConst()) {
- // trivially satisfied, so the var. can be removed
- } else {
- norm.add(new OrderColumn(ec.getVariableRepresentative(), v.getOrder()));
- }
+ for (OrderColumn orderColumn : orderColumns) {
+ EquivalenceClass columnEQClass = equivalenceClasses.get(orderColumn.getColumn());
+ if (columnEQClass == null) {
+ norm.add(orderColumn);
+ } else if (!columnEQClass.representativeIsConst()) {
+ norm.add(new OrderColumn(columnEQClass.getVariableRepresentative(), orderColumn.getOrder()));
}
+ // else columnEQClass rep. is constant, i.e. trivially satisfied, so the var. can be removed
}
return norm;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index deb98b0..548a29f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -124,4 +125,5 @@ public interface ILogicalOperatorVisitor<R, T> {
public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
+ public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 8779777..15bb54b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -25,4 +25,6 @@ public class AlgebricksConfig {
public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks";
public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME);
+ public static final int SORT_SAMPLES = 100;
+ public static final boolean SORT_PARALLEL = true;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
index 6fa378b..e3b1868 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPoli
import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
import org.apache.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
private static final long serialVersionUID = 1L;
@@ -33,7 +34,8 @@ public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignme
@Override
public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
int[] fanouts) {
- if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor
+ || c instanceof MToOneSequentialMergingConnectorDescriptor) {
return senderSideMaterializePolicy;
} else {
return pipeliningPolicy;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 8eb9b90..a2a0ca1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -37,29 +37,20 @@ import org.apache.logging.log4j.Level;
public class HeuristicOptimizer {
- public static PhysicalOperatorTag[] hyracksOperators =
- new PhysicalOperatorTag[] { PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
- PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY,
- PhysicalOperatorTag.HDFS_READER, PhysicalOperatorTag.HYBRID_HASH_JOIN,
- PhysicalOperatorTag.IN_MEMORY_HASH_JOIN, PhysicalOperatorTag.NESTED_LOOP,
- PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY, PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY,
- PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT, PhysicalOperatorTag.UNION_ALL };
- public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
-
- public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
- for (PhysicalOperatorTag t : hyracksOperators) {
- if (t == opTag) {
- return true;
- }
- }
- return false;
- }
-
private final IOptimizationContext context;
private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
private final ILogicalPlan plan;
+ private static final PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
+ PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+ PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+ PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+ PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+ PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT,
+ PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
+ public static final PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
+
public HeuristicOptimizer(ILogicalPlan plan,
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
@@ -70,6 +61,15 @@ public class HeuristicOptimizer {
this.physicalRewrites = physicalRewrites;
}
+ public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
+ for (PhysicalOperatorTag t : hyracksOperators) {
+ if (t == opTag) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void optimize() throws AlgebricksException {
if (plan == null) {
return;
@@ -129,7 +129,6 @@ public class HeuristicOptimizer {
if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting physical optimizations.\n");
}
- // PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(plan);
runOptimizationSets(plan, physicalRewrites);
}