You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:41 UTC
[28/51] [partial] incubator-asterixdb-hyracks git commit: Change
folder structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
new file mode 100644
index 0000000..695078a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -0,0 +1,469 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class SubstituteVariableVisitor implements ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> {
+
+ private final boolean goThroughNts;
+ private final ITypingContext ctx;
+
+ public SubstituteVariableVisitor(boolean goThroughNts, ITypingContext ctx) {
+ this.goThroughNts = goThroughNts;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ int n = variables.size();
+ for (int i = 0; i < n; i++) {
+ if (variables.get(i).equals(pair.first)) {
+ variables.set(i, pair.second);
+ } else {
+ op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ int n = variables.size();
+ for (int i = 0; i < n; i++) {
+ if (variables.get(i).equals(pair.first)) {
+ variables.set(i, pair.second);
+ } else {
+ op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+ }
+ }
+ // Substitute variables stored in ordering property
+ if (op.getExplicitOrderingProperty() != null) {
+ List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
+ for (int i = 0; i < orderColumns.size(); i++) {
+ OrderColumn oc = orderColumns.get(i);
+ if (oc.getColumn().equals(pair.first)) {
+ orderColumns.set(i, new OrderColumn(pair.second, oc.getOrder()));
+ }
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ for (int i = 0; i < variables.size(); i++) {
+ if (variables.get(i) == pair.first) {
+ variables.set(i, pair.second);
+ return null;
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> eRef : op.getExpressions()) {
+ eRef.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+ // does not use any variable
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+ // does not use any variable
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ subst(pair.first, pair.second, op.getGroupByList());
+ subst(pair.first, pair.second, op.getDecorList());
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
+ pair.second, goThroughNts, ctx);
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ op.getCondition().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ op.getCondition().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ op.getMaxObjects().getValue().substituteVar(pair.first, pair.second);
+ ILogicalExpression offset = op.getOffset().getValue();
+ if (offset != null) {
+ offset.substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+ oe.second.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> usedVariables = op.getVariables();
+ int n = usedVariables.size();
+ for (int i = 0; i < n; i++) {
+ LogicalVariable v = usedVariables.get(i);
+ if (v.equals(pair.first)) {
+ usedVariables.set(i, pair.second);
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ int n = variables.size();
+ for (int i = 0; i < n; i++) {
+ if (variables.get(i).equals(pair.first)) {
+ variables.set(i, pair.second);
+ } else {
+ op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ substInArray(op.getInputVariables(), pair.first, pair.second);
+ substInArray(op.getOutputVariables(), pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+ op.getCondition().getValue().substituteVar(pair.first, pair.second);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
+ pair.second, goThroughNts, ctx);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
+ if (t.first.equals(pair.first)) {
+ t.first = pair.second;
+ }
+ if (t.second.equals(pair.first)) {
+ t.second = pair.second;
+ }
+ if (t.third.equals(pair.first)) {
+ t.third = pair.second;
+ }
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ for (int i = 0; i < variables.size(); i++) {
+ if (variables.get(i) == pair.first) {
+ variables.set(i, pair.second);
+ return null;
+ }
+ }
+ op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ for (int i = 0; i < variables.size(); i++) {
+ if (variables.get(i) == pair.first) {
+ variables.set(i, pair.second);
+ return null;
+ }
+ }
+ op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
+ for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ private void subst(LogicalVariable v1, LogicalVariable v2,
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> varExprPairList) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : varExprPairList) {
+ if (ve.first != null && ve.first.equals(v1)) {
+ ve.first = v2;
+ return;
+ }
+ ve.second.getValue().substituteVar(v1, v2);
+ }
+ }
+
+ private void substInArray(ArrayList<LogicalVariable> varArray, LogicalVariable v1, LogicalVariable v2) {
+ for (int i = 0; i < varArray.size(); i++) {
+ LogicalVariable v = varArray.get(i);
+ if (v == v1) {
+ varArray.set(i, v2);
+ }
+ }
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ op.substituteVar(arg.first, arg.second);
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ return null;
+ }
+
+ private void substVarTypes(ILogicalOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ if (ctx == null) {
+ return;
+ }
+ IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
+ env.substituteProducedVariable(arg.first, arg.second);
+ }
+
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitExternalDataLookupOperator(ExternalDataLookupOperator op,
+ Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
+ List<LogicalVariable> variables = op.getVariables();
+ for (int i = 0; i < variables.size(); i++) {
+ if (variables.get(i) == pair.first) {
+ variables.set(i, pair.second);
+ return null;
+ }
+ }
+ op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
new file mode 100644
index 0000000..8169ad0
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -0,0 +1,407 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {
+
+ private Collection<LogicalVariable> usedVariables;
+
+ public UsedVariableVisitor(Collection<LogicalVariable> usedVariables) {
+ this.usedVariables = usedVariables;
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) {
+ if (op.getAdditionalFilteringExpressions() != null) {
+ for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> eRef : op.getExpressions()) {
+ eRef.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) {
+ // does not use any variable
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ // Used variables depend on the physical operator.
+ if (op.getPhysicalOperator() != null) {
+ IPhysicalOperator physOp = op.getPhysicalOperator();
+ switch (physOp.getOperatorTag()) {
+ case BROADCAST_EXCHANGE:
+ case ONE_TO_ONE_EXCHANGE:
+ case RANDOM_MERGE_EXCHANGE: {
+ // No variables used.
+ break;
+ }
+ case HASH_PARTITION_EXCHANGE: {
+ HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;
+ usedVariables.addAll(concreteOp.getHashFields());
+ break;
+ }
+ case HASH_PARTITION_MERGE_EXCHANGE: {
+ HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;
+ usedVariables.addAll(concreteOp.getPartitionFields());
+ for (OrderColumn orderCol : concreteOp.getOrderColumns()) {
+ usedVariables.add(orderCol.getColumn());
+ }
+ break;
+ }
+ case SORT_MERGE_EXCHANGE: {
+ SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;
+ for (OrderColumn orderCol : concreteOp.getSortColumns()) {
+ usedVariables.add(orderCol.getColumn());
+ }
+ break;
+ }
+ case RANGE_PARTITION_EXCHANGE: {
+ RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;
+ for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ usedVariables.add(partCol.getColumn());
+ }
+ break;
+ }
+ case RANGE_PARTITION_MERGE_EXCHANGE: {
+ RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+ for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ usedVariables.add(partCol.getColumn());
+ }
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getGroupByList()) {
+ g.second.getValue().getUsedVariables(usedVariables);
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getDecorList()) {
+ g.second.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) {
+ op.getCondition().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) {
+ op.getCondition().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) {
+ op.getMaxObjects().getValue().getUsedVariables(usedVariables);
+ ILogicalExpression offsetExpr = op.getOffset().getValue();
+ if (offsetExpr != null) {
+ offsetExpr.getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) {
+ // does not use any variable
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) {
+ for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+ oe.second.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) {
+ List<LogicalVariable> parameterVariables = op.getVariables();
+ for (LogicalVariable v : parameterVariables) {
+ if (!usedVariables.contains(v)) {
+ usedVariables.add(v);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) {
+ List<LogicalVariable> parameterVariables = op.getInputVariables();
+ for (LogicalVariable v : parameterVariables) {
+ if (!usedVariables.contains(v)) {
+ usedVariables.add(v);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) {
+ op.getCondition().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) {
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> m : op.getVariableMappings()) {
+ if (!usedVariables.contains(m.first)) {
+ usedVariables.add(m.first);
+ }
+ if (!usedVariables.contains(m.second)) {
+ usedVariables.add(m.second);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) {
+ op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+ if (op.getAdditionalFilteringExpressions() != null) {
+ for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) {
+ op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+ expr.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+ expr.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {
+ op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+ for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ if (op.getAdditionalFilteringExpressions() != null) {
+ for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) {
+ op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ if (op.getAdditionalFilteringExpressions() != null) {
+ for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ if (op.getAdditionalFilteringExpressions() != null) {
+ for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) {
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ op.getDelegate().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
+ public Void visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg) throws AlgebricksException {
+ op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
new file mode 100644
index 0000000..f3cf0a4
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class VariableUtilities {
+
+ public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+ throws AlgebricksException {
+ // DFS traversal
+ VariableUtilities.getUsedVariables(op, vars);
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);
+ }
+ }
+
+ public static void getProducedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+ throws AlgebricksException {
+ // DFS traversal
+ VariableUtilities.getProducedVariables(op, vars);
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ getProducedVariablesInDescendantsAndSelf(c.getValue(), vars);
+ }
+ }
+
+ public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+ ITypingContext ctx) throws AlgebricksException {
+ substituteVariables(op, v1, v2, true, ctx);
+ }
+
+ public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
+ LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+ substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
+ }
+ substituteVariables(op, v1, v2, true, ctx);
+ }
+
+ public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+ boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
+ goThroughNts, ctx);
+ op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
+ }
+
+ public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
+ Set<T> varSet = new HashSet<T>();
+ Set<T> varArgSet = new HashSet<T>();
+ varSet.addAll(var);
+ varArgSet.addAll(varArg);
+ return varSet.equals(varArgSet);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
new file mode 100644
index 0000000..42d964d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator {
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+ opSchema, context);
+ builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
+ public abstract Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(
+ IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
+ throws AlgebricksException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
new file mode 100644
index 0000000..764159d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
+
+ protected List<LogicalVariable> keysLeftBranch;
+ protected List<LogicalVariable> keysRightBranch;
+
+ public AbstractHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities) {
+ super(kind, partitioningType);
+ this.keysLeftBranch = sideLeftOfEqualities;
+ this.keysRightBranch = sideRightOfEqualities;
+ }
+
+ public List<LogicalVariable> getKeysLeftBranch() {
+ return keysLeftBranch;
+ }
+
+ public List<LogicalVariable> getKeysRightBranch() {
+ return keysRightBranch;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context)
+ throws AlgebricksException {
+ IPartitioningProperty pp;
+ AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+ IPhysicalPropertiesVector pv1 = op1.getPhysicalOperator().getDeliveredProperties();
+
+ if (pv0 == null || pv1 == null) {
+ pp = null;
+ } else {
+ pp = pv0.getPartitioningProperty();
+ }
+ } else {
+ pp = IPartitioningProperty.UNPARTITIONED;
+ }
+ this.deliveredProperties = new StructuralPropertiesVector(pp, deliveredLocalProperties(iop, context));
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
+ IPhysicalPropertiesVector reqdByParent) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+ // In a cost-based optimizer, we would also try to propagate the
+ // parent's partitioning requirements.
+ AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+ IPartitioningProperty pp1 = null;
+ IPartitioningProperty pp2 = null;
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ switch (partitioningType) {
+ case PAIRWISE: {
+ pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null);
+ pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null);
+ break;
+ }
+ case BROADCAST: {
+ pp2 = new BroadcastPartitioningProperty(null);
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ pv[0] = new StructuralPropertiesVector(pp1, null);
+ pv[1] = new StructuralPropertiesVector(pp2, null);
+
+ IPartitioningRequirementsCoordinator prc;
+ switch (kind) {
+ case INNER: {
+ prc = IPartitioningRequirementsCoordinator.EQCLASS_PARTITIONING_COORDINATOR;
+ break;
+ }
+ case LEFT_OUTER: {
+ prc = new IPartitioningRequirementsCoordinator() {
+
+ @Override
+ public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
+ IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
+ ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
+ if (firstDeliveredPartitioning != null
+ && firstDeliveredPartitioning.getPartitioningType() == requirements
+ .getPartitioningType()) {
+ switch (requirements.getPartitioningType()) {
+ case UNORDERED_PARTITIONED: {
+ UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
+ Set<LogicalVariable> set1 = upp1.getColumnSet();
+ UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
+ Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+ Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
+ Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ for (LogicalVariable r : uppreq.getColumnSet()) {
+ EquivalenceClass ecSnd = eqmap.get(r);
+ boolean found = false;
+ int j = 0;
+ for (LogicalVariable rvar : keysRightBranch) {
+ if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) {
+ found = true;
+ break;
+ }
+ j++;
+ }
+ if (!found) {
+ throw new IllegalStateException("Did not find a variable equivalent to "
+ + r + " among " + keysRightBranch);
+ }
+ LogicalVariable v2 = keysLeftBranch.get(j);
+ EquivalenceClass ecFst = eqmap.get(v2);
+ for (LogicalVariable vset1 : set1) {
+ if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) {
+ covered.add(vset1);
+ modifuppreq.add(r);
+ break;
+ }
+ }
+ }
+ if (!covered.equals(set1)) {
+ throw new AlgebricksException("Could not modify " + requirements
+ + " to agree with partitioning property " + firstDeliveredPartitioning
+ + " delivered by previous input operator.");
+ }
+ UnorderedPartitionedProperty upp2 = new UnorderedPartitionedProperty(modifuppreq,
+ requirements.getNodeDomain());
+ return new Pair<Boolean, IPartitioningProperty>(false, upp2);
+ }
+ case ORDERED_PARTITIONED: {
+ throw new NotImplementedException();
+ }
+ }
+ }
+ return new Pair<Boolean, IPartitioningProperty>(true, requirements);
+ }
+ };
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+
+ return new PhysicalRequirements(pv, prc);
+ }
+
+ protected abstract List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op,
+ IOptimizationContext context) throws AlgebricksException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
new file mode 100644
index 0000000..fcc04ab
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+
+public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
+
+ public enum JoinPartitioningType {
+ PAIRWISE, BROADCAST
+ }
+
+ protected final JoinKind kind;
+ protected final JoinPartitioningType partitioningType;
+
+ public AbstractJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType) {
+ this.kind = kind;
+ this.partitioningType = partitioningType;
+ }
+
+ public JoinKind getKind() {
+ return kind;
+ }
+
+ public JoinPartitioningType getPartitioningType() {
+ return partitioningType;
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 1, 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
new file mode 100644
index 0000000..66e7b98
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class AbstractPhysicalOperator implements IPhysicalOperator {
+
+ protected IPhysicalPropertiesVector deliveredProperties;
+ private boolean disableJobGenBelow = false;
+ private Object hostQueryContext;
+
+ @Override
+ public final IPhysicalPropertiesVector getDeliveredProperties() {
+ return deliveredProperties;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString();
+ }
+
+ public void setHostQueryContext(Object context) {
+ this.hostQueryContext = context;
+ }
+
+ public Object getHostQueryContext() {
+ return hostQueryContext;
+ }
+
+ protected PhysicalRequirements emptyUnaryRequirements() {
+ StructuralPropertiesVector[] req = new StructuralPropertiesVector[] { StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
+ return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ protected PhysicalRequirements emptyUnaryRequirements(int numberOfChildren) {
+ StructuralPropertiesVector[] req = new StructuralPropertiesVector[numberOfChildren];
+ for (int i = 0; i < numberOfChildren; i++) {
+ req[i] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+ }
+ return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void disableJobGenBelowMe() {
+ this.disableJobGenBelow = true;
+ }
+
+ @Override
+ public boolean isJobGenDisabledBelowMe() {
+ return disableJobGenBelow;
+ }
+
+ /**
+ * @return labels (0 or 1) for each input and output indicating the dependency between them.
+ * The edges labeled as 1 must wait for the edges with label 0.
+ */
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[op.getInputs().size()]; // filled with 0's
+ int[] outputDependencyLabels = new int[] { 0 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op, IOperatorDescriptor opDesc) {
+ if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
+ AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
+ builder.contributeAlgebricksPartitionConstraint(opDesc, apc);
+ }
+ builder.contributeHyracksOperator(op, opDesc);
+ }
+
+ protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
+ AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context)
+ throws AlgebricksException {
+ AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
+ PlanCompiler pc = new PlanCompiler(context);
+ int i = 0;
+ for (ILogicalPlan p : npOp.getNestedPlans()) {
+ subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc);
+ }
+ return subplans;
+ }
+
+ private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
+ AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
+ if (p.getRoots().size() > 1) {
+ throw new NotImplementedException("Nested plans with several roots are not supported.");
+ }
+ JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+ ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
+ JobGenContext context = pc.getContext();
+ IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
+ opSchema.addAllVariables(topOpInSubplanScm);
+
+ Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap();
+ if (opMap.size() != 1) {
+ throw new AlgebricksException(
+ "Attempting to construct a nested plan with "
+ + opMap.size()
+ + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators.");
+ }
+
+ for (OperatorDescriptorId oid : opMap.keySet()) {
+ IOperatorDescriptor opd = opMap.get(oid);
+ if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
+ throw new AlgebricksException(
+ "Can only generate Hyracks jobs for pipelinable Asterix nested plans, not for "
+ + opd.getClass().getName());
+ }
+ AlgebricksMetaOperatorDescriptor amod = (AlgebricksMetaOperatorDescriptor) opd;
+
+ return amod.getPipeline();
+ // we suppose that the top operator in the subplan already does the
+ // projection for us
+ }
+
+ throw new IllegalStateException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..41711cb
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
+
+ protected List<LogicalVariable> columnList;
+
+ public AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+ this.columnList = columnList;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + columnList;
+ }
+
+ public List<LogicalVariable> getGbyColumns() {
+ return columnList;
+ }
+
+ public void setGbyColumns(List<LogicalVariable> gByColumns) {
+ this.columnList = gByColumns;
+ }
+
+ // Obs: We don't propagate properties corresponding to decors, since they
+ // are func. dep. on the group-by variables.
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
+ GroupByOperator gby = (GroupByOperator) op;
+ ILogicalOperator op2 = gby.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
+ IPartitioningProperty pp = childProp.getPartitioningProperty();
+ List<ILocalStructuralProperty> childLocals = childProp.getLocalProperties();
+ if (childLocals != null) {
+ for (ILocalStructuralProperty lsp : childLocals) {
+ boolean failed = false;
+ switch (lsp.getPropertyType()) {
+ case LOCAL_GROUPING_PROPERTY: {
+ LocalGroupingProperty lgp = (LocalGroupingProperty) lsp;
+ Set<LogicalVariable> colSet = new ListSet<LogicalVariable>();
+ for (LogicalVariable v : lgp.getColumnSet()) {
+ LogicalVariable v2 = getLhsGbyVar(gby, v);
+ if (v2 != null) {
+ colSet.add(v2);
+ } else {
+ failed = true;
+ }
+ }
+ if (!failed) {
+ propsLocal.add(new LocalGroupingProperty(colSet));
+ }
+ break;
+ }
+ case LOCAL_ORDER_PROPERTY: {
+ LocalOrderProperty lop = (LocalOrderProperty) lsp;
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ for (OrderColumn oc : lop.getOrderColumns()) {
+ LogicalVariable v2 = getLhsGbyVar(gby, oc.getColumn());
+ if (v2 != null) {
+ orderColumns.add(new OrderColumn(v2, oc.getOrder()));
+ } else {
+ failed = true;
+ }
+ }
+ if (!failed) {
+ propsLocal.add(new LocalOrderProperty(orderColumns));
+ }
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ if (failed) {
+ break;
+ }
+ }
+ }
+ deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ List<ILocalStructuralProperty> localProps = null;
+
+ localProps = new ArrayList<ILocalStructuralProperty>(1);
+ Set<LogicalVariable> gbvars = new ListSet<LogicalVariable>(columnList);
+ LocalGroupingProperty groupProp = new LocalGroupingProperty(gbvars, new ArrayList<LogicalVariable>(columnList));
+
+ GroupByOperator gby = (GroupByOperator) op;
+ boolean goon = true;
+ for (ILogicalPlan p : gby.getNestedPlans()) {
+ // try to propagate secondary order requirements from nested
+ // groupings
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue();
+ if (op1.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ IPhysicalOperator pop2 = op2.getPhysicalOperator();
+ if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
+ List<LogicalVariable> sndOrder = ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+ groupProp.getColumnSet().addAll(sndOrder);
+ groupProp.getPreferredOrderEnforcer().addAll(sndOrder);
+ goon = false;
+ break;
+ }
+ }
+ }
+ if (!goon) {
+ break;
+ }
+ }
+
+ localProps.add(groupProp);
+
+ if (reqdByParent != null) {
+ // propagate parent requirements
+ List<ILocalStructuralProperty> lpPar = reqdByParent.getLocalProperties();
+ if (lpPar != null) {
+ boolean allOk = true;
+ List<ILocalStructuralProperty> props = new ArrayList<ILocalStructuralProperty>(lpPar.size());
+ for (ILocalStructuralProperty prop : lpPar) {
+ if (prop.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+ allOk = false;
+ break;
+ }
+ LocalOrderProperty lop = (LocalOrderProperty) prop;
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ List<OrderColumn> ords = lop.getOrderColumns();
+ for (OrderColumn ord : ords) {
+ Pair<LogicalVariable, Mutable<ILogicalExpression>> p = getGbyPairByRhsVar(gby, ord.getColumn());
+ if (p == null) {
+ p = getDecorPairByRhsVar(gby, ord.getColumn());
+ if (p == null) {
+ allOk = false;
+ break;
+ }
+ }
+ ILogicalExpression e = p.second.getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new IllegalStateException(
+ "Right hand side of group-by assignment should have been normalized to a variable reference.");
+ }
+ LogicalVariable v = ((VariableReferenceExpression) e).getVariableReference();
+ orderColumns.add(new OrderColumn(v, ord.getOrder()));
+ }
+ props.add(new LocalOrderProperty(orderColumns));
+ }
+ List<FunctionalDependency> fdList = new ArrayList<FunctionalDependency>();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : gby.getDecorList()) {
+ List<LogicalVariable> hd = gby.getGbyVarList();
+ List<LogicalVariable> tl = new ArrayList<LogicalVariable>(1);
+ tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
+ fdList.add(new FunctionalDependency(hd, tl));
+ }
+ if (allOk
+ && PropertiesUtil.matchLocalProperties(localProps, props,
+ new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
+ localProps = props;
+ }
+ }
+ }
+
+ IPartitioningProperty pp = null;
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
+ if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
+ pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+ }
+ pv[0] = new StructuralPropertiesVector(pp, localProps);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ private static Pair<LogicalVariable, Mutable<ILogicalExpression>> getGbyPairByRhsVar(GroupByOperator gby,
+ LogicalVariable var) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getGroupByList()) {
+ if (ve.first == var) {
+ return ve;
+ }
+ }
+ return null;
+ }
+
+ private static Pair<LogicalVariable, Mutable<ILogicalExpression>> getDecorPairByRhsVar(GroupByOperator gby,
+ LogicalVariable var) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getDecorList()) {
+ if (ve.first == var) {
+ return ve;
+ }
+ }
+ return null;
+ }
+
+ private static LogicalVariable getLhsGbyVar(GroupByOperator gby, LogicalVariable var) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getGroupByList()) {
+ ILogicalExpression e = ve.second.getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new IllegalStateException(
+ "Right hand side of group by assignment should have been normalized to a variable reference.");
+ }
+ LogicalVariable v = ((VariableReferenceExpression) e).getVariableReference();
+ if (v == var) {
+ return ve.first;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
new file mode 100644
index 0000000..c77222b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractPropagatePropertiesForUsedVariablesPOperator extends AbstractPhysicalOperator {
+
+ public void computeDeliveredPropertiesForUsedVariables(ILogicalOperator op, List<LogicalVariable> usedVariables) {
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+ List<ILocalStructuralProperty> downPropsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties();
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ for (ILocalStructuralProperty lsp : downPropsLocal) {
+ LinkedList<LogicalVariable> cols = new LinkedList<LogicalVariable>();
+ lsp.getColumns(cols);
+ ILocalStructuralProperty propagatedProp = lsp.retainVariables(usedVariables);
+ if (propagatedProp != null) {
+ propsLocal.add(propagatedProp);
+ }
+ }
+ deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+ }
+}