You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:38 UTC
[25/51] [partial] incubator-asterixdb-hyracks git commit: Change
folder structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
new file mode 100644
index 0000000..a24c48d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
+
+ private List<LogicalVariable> columnList;
+
+ public PreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
+ this.columnList = columnList;
+ }
+
+ public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
+ this.columnList = distinctByColumns;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+ List<ILocalStructuralProperty> propsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties();
+ deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ for (LogicalVariable column : columnList) {
+ orderColumns.add(new OrderColumn(column, OrderKind.ASC));
+ }
+ localProps.add(new LocalOrderProperty(orderColumns));
+ IPartitioningProperty pp = null;
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
+ if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
+ pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+ }
+ pv[0] = new StructuralPropertiesVector(pp, localProps);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+ int sz = inputSchemas[0].getSize();
+ int fdSz = sz - columnList.size();
+ int[] fdColumns = new int[fdSz];
+ int j = 0;
+ for (LogicalVariable v : inputSchemas[0]) {
+ if (!columnList.contains(v)) {
+ fdColumns[j++] = inputSchemas[0].findVariable(v);
+ }
+ }
+ int[] keysAndDecs = new int[keys.length + fdColumns.length];
+ for (int i = 0; i < keys.length; i++) {
+ keysAndDecs[i] = keys[i];
+ }
+ for (int i = 0; i < fdColumns.length; i++) {
+ keysAndDecs[i + keys.length] = fdColumns[i];
+ }
+
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ columnList, context.getTypeEnvironment(op), context);
+ IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
+ IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
+ aggFactories, keysAndDecs);
+
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+ /** make fd columns part of the key but the comparator only compares the distinct key columns */
+ PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
+ comparatorFactories, aggregatorFactory, recordDescriptor);
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..bcd998d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
+
+ public PreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+ GroupByOperator gby = (GroupByOperator) op;
+ int numFds = gby.getDecorList().size();
+ int fdColumns[] = new int[numFds];
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("pre-sorted group-by expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ fdColumns[j++] = inputSchemas[0].findVariable(decor);
+ }
+ // compile subplans and set the gby op. schema accordingly
+ AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
+ IAggregatorDescriptorFactory aggregatorFactory;
+
+ if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+ aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+ } else {
+ aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+ }
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ columnList, context.getTypeEnvironment(op), context);
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+
+ PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
+ comparatorFactories, aggregatorFactory, recordDescriptor);
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
new file mode 100644
index 0000000..2478859
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+
+public class RandomMergeExchangePOperator extends AbstractExchangePOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANDOM_MERGE_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ this.deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+ new ArrayList<ILocalStructuralProperty>(0));
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
+ IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
new file mode 100644
index 0000000..c633f7b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionPOperator extends AbstractExchangePOperator {
+
+ private final INodeDomain domain;
+
+ public RandomPartitionPOperator(INodeDomain domain) {
+ this.domain = domain;
+ }
+
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+ opSchema, context);
+ builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+ MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
+ .getDeliveredPhysicalProperties().getLocalProperties());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
new file mode 100644
index 0000000..99f68e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+
+ private List<OrderColumn> partitioningFields;
+ private INodeDomain domain;
+ private IRangeMap rangeMap;
+
+ public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+ this.partitioningFields = partitioningFields;
+ this.domain = domain;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+ }
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
+ for (OrderColumn oc : partitioningFields) {
+ varList.add(oc.getColumn());
+ }
+ IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
+ List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+ for (ILocalStructuralProperty prop : op2Locals) {
+ if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+ locals.add(prop);
+ } else {
+ break;
+ }
+ }
+
+ this.deliveredProperties = new StructuralPropertiesVector(p, locals);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
+ List<OrderColumn> columns = new ArrayList<OrderColumn>();
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ columns.add(new OrderColumn(var, oc.getOrder()));
+ }
+ orderProps.add(new LocalOrderProperty(columns));
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+ orderProps) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
new file mode 100644
index 0000000..66955e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RangePartitionPOperator extends AbstractExchangePOperator {
+
+ private List<OrderColumn> partitioningFields;
+ private INodeDomain domain;
+ private IRangeMap rangeMap;
+
+ public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+ this.partitioningFields = partitioningFields;
+ this.domain = domain;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
+ }
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
+ this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
new file mode 100644
index 0000000..d8f7d99
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class ReplicatePOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SPLIT;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+
+ ReplicateOperator rop = (ReplicateOperator) op;
+ int outputArity = rop.getOutputArity();
+ boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+
+ SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ ReplicateOperator rop = (ReplicateOperator) op;
+ int[] outputDependencyLabels = new int[rop.getOutputArity()];
+ // change the labels of outputs that requires materialization to 1
+ boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+ for (int i = 0; i < rop.getOutputArity(); i++) {
+ if (outputMaterializationFlags[i]) {
+ outputDependencyLabels[i] = 1;
+ }
+ }
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
new file mode 100644
index 0000000..3b2387a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class RunningAggregatePOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RUNNING_AGGREGATE;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+ List<LogicalVariable> variables = ragg.getVariables();
+ List<Mutable<ILogicalExpression>> expressions = ragg.getExpressions();
+ int[] outColumns = new int[variables.size()];
+ for (int i = 0; i < outColumns.length; i++) {
+ outColumns[i] = opSchema.findVariable(variables.get(i));
+ }
+ IRunningAggregateEvaluatorFactory[] runningAggFuns = new IRunningAggregateEvaluatorFactory[expressions.size()];
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ for (int i = 0; i < runningAggFuns.length; i++) {
+ StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue();
+ runningAggFuns[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
+ // TODO push projections into the operator
+ int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+ RunningAggregateRuntimeFactory runtime = new RunningAggregateRuntimeFactory(outColumns, runningAggFuns,
+ projectionList);
+
+ // contribute one Asterix framewriter
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ builder.contributeMicroOperator(ragg, runtime, recDesc);
+ // and contribute one edge from its child
+ ILogicalOperator src = ragg.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, ragg, 0);
+
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
new file mode 100644
index 0000000..ba299ac
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class SinkPOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SINK;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2;
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ IPhysicalPropertiesVector childsProperties = null;
+
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ op2 = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+ childsProperties = op2.getPhysicalOperator().getDeliveredProperties();
+ if (childsProperties.getLocalProperties() != null) {
+ propsLocal.addAll(childsProperties.getLocalProperties());
+ }
+ }
+
+ deliveredProperties = new StructuralPropertiesVector(childsProperties.getPartitioningProperty(), propsLocal);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements(op.getInputs().size());
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+ SinkOperatorDescriptor opDesc = new SinkOperatorDescriptor(spec, op.getInputs().size());
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+ }
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
new file mode 100644
index 0000000..9105c1f
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWritePOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SINK_WRITE;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ WriteOperator write = (WriteOperator) op;
+ IDataSink sink = write.getDataSink();
+ IPartitioningProperty pp = sink.getPartitioningProperty();
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ WriteOperator write = (WriteOperator) op;
+ int[] columns = new int[write.getExpressions().size()];
+ int i = 0;
+ for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException("Only writing variable expressions is supported.");
+ }
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ LogicalVariable v = varRef.getVariableReference();
+ columns[i++] = inputSchemas[0].findVariable(v);
+ }
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0],
+ context);
+
+ IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+ context, columns);
+
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtime = mp.getWriteFileRuntime(write.getDataSink(),
+ columns, pf, inputDesc);
+
+ builder.contributeMicroOperator(write, runtime.first, recDesc, runtime.second);
+ ILogicalOperator src = write.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, write, 0);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
new file mode 100644
index 0000000..6ad1fab
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+
+public class SortGroupByPOperator extends AbstractPhysicalOperator {
+
+ private final int frameLimit;
+ private final OrderColumn[] orderColumns;
+ private final List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
+
+ public SortGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
+ OrderColumn[] orderColumns) {
+ this.frameLimit = frameLimit;
+ this.orderColumns = orderColumns;
+ computeColumnSet(gbyList);
+ }
+
+ private void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
+ columnSet.clear();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ columnSet.add(v.getVariableReference());
+ }
+ }
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SORT_GROUP_BY;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + columnSet;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ private List<LogicalVariable> getGbyColumns() {
+ return columnSet;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
+
+ GroupByOperator gOp = (GroupByOperator) op;
+ Set<LogicalVariable> columnSet = new ListSet<LogicalVariable>();
+ List<OrderColumn> ocs = new ArrayList<OrderColumn>();
+
+ if (!columnSet.isEmpty()) {
+ propsLocal.add(new LocalGroupingProperty(columnSet));
+ }
+ for (OrderColumn oc : orderColumns) {
+ ocs.add(oc);
+ }
+ propsLocal.add(new LocalOrderProperty(ocs));
+ for (ILogicalPlan p : gOp.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ ILogicalOperator rOp = r.getValue();
+ propsLocal.addAll(rOp.getDeliveredPhysicalProperties().getLocalProperties());
+ }
+ }
+
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
+ deliveredProperties = new StructuralPropertiesVector(childProp.getPartitioningProperty(), propsLocal);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ List<LogicalVariable> gbyCols = getGbyColumns();
+ int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
+ GroupByOperator gby = (GroupByOperator) op;
+ int numFds = gby.getDecorList().size();
+ int fdColumns[] = new int[numFds];
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("Sort group-by expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ fdColumns[j++] = inputSchemas[0].findVariable(decor);
+ }
+
+ if (gby.getNestedPlans().size() != 1) {
+ throw new AlgebricksException(
+ "Sort group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ ILogicalPlan p0 = gby.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ throw new AlgebricksException(
+ "Sort group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+
+ IPartialAggregationTypeComputer partialAggregationTypeComputer = context.getPartialAggregationTypeComputer();
+ List<Object> intermediateTypes = new ArrayList<Object>();
+ int n = aggOp.getExpressions().size();
+ IAggregateEvaluatorFactory[] aff = new IAggregateEvaluatorFactory[n];
+ int i = 0;
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IVariableTypeEnvironment aggOpInputEnv = context.getTypeEnvironment(aggOp.getInputs().get(0).getValue());
+ IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+ for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+ aff[i++] = expressionRuntimeProvider.createAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+ context);
+ intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+ context.getMetadataProvider()));
+ }
+
+ int[] keyAndDecFields = new int[keys.length + fdColumns.length];
+ for (i = 0; i < keys.length; ++i) {
+ keyAndDecFields[i] = keys[i];
+ }
+ for (i = 0; i < fdColumns.length; i++) {
+ keyAndDecFields[keys.length + i] = fdColumns[i];
+ }
+
+ List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
+ keyAndDecVariables.add(p.first);
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+ keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+ }
+
+ for (LogicalVariable var : keyAndDecVariables) {
+ aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+ }
+
+ compileSubplans(inputSchemas[0], gby, opSchema, context);
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+ IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[gbyCols.size()];
+ IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+ i = 0;
+ for (LogicalVariable v : gbyCols) {
+ Object type = aggOpInputEnv.getVarType(v);
+ if (orderColumns[i].getOrder() == OrderKind.ASC) {
+ compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+ } else {
+ compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, false);
+ }
+ i++;
+ }
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+
+ IAggregateEvaluatorFactory[] merges = new IAggregateEvaluatorFactory[n];
+ List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+ IOperatorSchema[] localInputSchemas = new IOperatorSchema[1];
+ localInputSchemas[0] = new OperatorSchemaImpl();
+ for (i = 0; i < n; i++) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+ .get(i).getValue();
+ aggFun.getUsedVariables(usedVars);
+ }
+ i = 0;
+ for (Object type : intermediateTypes) {
+ aggOpInputEnv.setVarType(usedVars.get(i++), type);
+ }
+ for (LogicalVariable keyVar : keyAndDecVariables) {
+ localInputSchemas[0].addVariable(keyVar);
+ }
+ for (LogicalVariable usedVar : usedVars) {
+ localInputSchemas[0].addVariable(usedVar);
+ }
+ for (i = 0; i < n; i++) {
+ AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+ .get(i).getValue();
+ merges[i] = expressionRuntimeProvider.createAggregateFunctionFactory(mergeFun, aggOpInputEnv,
+ localInputSchemas, context);
+ }
+ RecordDescriptor partialAggRecordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ localInputSchemas[0], context);
+
+ IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aff,
+ keyAndDecFields);
+ IAggregatorDescriptorFactory mergeFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(merges,
+ keyAndDecFields);
+
+ INormalizedKeyComputerFactory normalizedKeyFactory = null;
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ if (nkcfProvider == null) {
+ normalizedKeyFactory = null;
+ }
+ Object type = aggOpInputEnv.getVarType(gbyCols.get(0));
+ normalizedKeyFactory = orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider
+ .getNormalizedKeyComputerFactory(type, true) : nkcfProvider
+ .getNormalizedKeyComputerFactory(type, false);
+ SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
+ keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
+ partialAggRecordDescriptor, recordDescriptor, false);
+
+ contributeOpDesc(builder, gby, gbyOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
new file mode 100644
index 0000000..7f29375
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class SortMergeExchangePOperator extends AbstractExchangePOperator {
+
+ private final OrderColumn[] sortColumns;
+
+ public SortMergeExchangePOperator(OrderColumn[] sortColumns) {
+ this.sortColumns = sortColumns;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SORT_MERGE_EXCHANGE;
+ }
+
+ public OrderColumn[] getSortColumns() {
+ return sortColumns;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getOperatorTag());
+ sb.append(" [");
+ sb.append(sortColumns[0]);
+ for (int i = 1; i < sortColumns.length; i++) {
+ sb.append(", " + sortColumns[i]);
+ }
+ sb.append(" ]");
+ return sb.toString();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator inp1 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector pv1 = inp1.getDeliveredPhysicalProperties();
+ if (pv1 == null) {
+ inp1.computeDeliveredPhysicalProperties(context);
+ pv1 = inp1.getDeliveredPhysicalProperties();
+ }
+
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
+ for (ILocalStructuralProperty prop : pv1.getLocalProperties()) {
+ if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+ LocalOrderProperty lop = (LocalOrderProperty) prop;
+ for (OrderColumn oc : lop.getOrderColumns()) {
+ if (oc.equals(sortColumns[orderColumns.size()])) {
+ orderColumns.add(oc);
+ if (orderColumns.size() == sortColumns.length) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ } else {
+ continue;
+ }
+ }
+ if (orderColumns.size() > 0) {
+ localProps.add(new LocalOrderProperty(orderColumns));
+ }
+ this.deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProps);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
+ localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+ localProps) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = sortColumns.length;
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+ IBinaryHashFunctionFactory[] hashFuns = new IBinaryHashFunctionFactory[n];
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ for (int i = 0; i < n; i++) {
+ sortFields[i] = opSchema.findVariable(sortColumns[i].getColumn());
+ Object type = env.getVarType(sortColumns[i].getColumn());
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+ IBinaryHashFunctionFactoryProvider bhffp = context.getBinaryHashFunctionFactoryProvider();
+ hashFuns[i] = bhffp.getBinaryHashFunctionFactory(type);
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+ }
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(sortFields, hashFuns);
+ IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
+ }
+
+}