You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/07/14 20:20:23 UTC
asterixdb git commit: Updated exchange operator names to include
'exchange' in the class name.
Repository: asterixdb
Updated Branches:
refs/heads/master c1f984e65 -> 61ec12893
Updated exchange operator names to include 'exchange' in the class name.
Change-Id: Icd10c9a7ba0ad9e7b899a350d14213c2c0951e75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/995
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/61ec1289
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/61ec1289
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/61ec1289
Branch: refs/heads/master
Commit: 61ec128932a6d56ea441915effc46f4248858087
Parents: c1f984e
Author: Preston Carman <pr...@apache.org>
Authored: Thu Jul 14 09:25:05 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Jul 14 13:19:25 2016 -0700
----------------------------------------------------------------------
...ceRandomPartitioningFeedComputationRule.java | 4 +-
.../logical/visitors/UsedVariableVisitor.java | 8 +-
.../physical/BroadcastExchangePOperator.java | 75 +++++++++
.../operators/physical/BroadcastPOperator.java | 75 ---------
.../RandomPartitionExchangePOperator.java | 92 +++++++++++
.../physical/RandomPartitionPOperator.java | 92 -----------
.../RangePartitionExchangePOperator.java | 126 +++++++++++++++
.../RangePartitionMergeExchangePOperator.java | 155 +++++++++++++++++++
.../physical/RangePartitionMergePOperator.java | 155 -------------------
.../physical/RangePartitionPOperator.java | 126 ---------------
.../rules/EnforceStructuralPropertiesRule.java | 16 +-
11 files changed, 462 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index fb81885..4f5a848 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -34,7 +34,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperat
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -77,7 +77,7 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebrai
}
};
- exchangeOp.setPhysicalOperator(new RandomPartitionPOperator(domain));
+ exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(domain));
op.getInputs().get(0).setValue(exchangeOp);
exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index efb45de..e966406 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -68,8 +68,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperato
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -155,14 +155,14 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
break;
}
case RANGE_PARTITION_EXCHANGE: {
- RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;
+ RangePartitionExchangePOperator concreteOp = (RangePartitionExchangePOperator) physOp;
for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
usedVariables.add(partCol.getColumn());
}
break;
}
case RANGE_PARTITION_MERGE_EXCHANGE: {
- RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+ RangePartitionMergeExchangePOperator concreteOp = (RangePartitionMergeExchangePOperator) physOp;
for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
usedVariables.add(partCol.getColumn());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
new file mode 100644
index 0000000..d9e1540
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+
+public class BroadcastExchangePOperator extends AbstractExchangePOperator {
+
+ private INodeDomain domain;
+
+ public BroadcastExchangePOperator(INodeDomain domain) {
+ this.domain = domain;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.BROADCAST_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ IPartitioningProperty pp = new BroadcastPartitioningProperty(domain);
+ // Broadcasts will destroy input local properties.
+ this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
deleted file mode 100644
index 69bfe2a..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
-
-public class BroadcastPOperator extends AbstractExchangePOperator {
-
- private INodeDomain domain;
-
- public BroadcastPOperator(INodeDomain domain) {
- this.domain = domain;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.BROADCAST_EXCHANGE;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty pp = new BroadcastPartitioningProperty(domain);
- // Broadcasts will destroy input local properties.
- this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>());
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
new file mode 100644
index 0000000..cba8f97
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionExchangePOperator extends AbstractExchangePOperator {
+
+ private final INodeDomain domain;
+
+ public RandomPartitionExchangePOperator(INodeDomain domain) {
+ this.domain = domain;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+ opSchema, context);
+ builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory();
+ MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+ op2.getDeliveredPhysicalProperties().getLocalProperties());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
deleted file mode 100644
index 7237d24..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-
-public class RandomPartitionPOperator extends AbstractExchangePOperator {
-
- private final INodeDomain domain;
-
- public RandomPartitionPOperator(INodeDomain domain) {
- this.domain = domain;
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
- opSchema, context);
- builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
- ILogicalOperator src = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, op, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory();
- MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
- op2.getDeliveredPhysicalProperties().getLocalProperties());
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
new file mode 100644
index 0000000..225ffa0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
+
+ private List<OrderColumn> partitioningFields;
+ private INodeDomain domain;
+ private IRangeMap rangeMap;
+
+ public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+ this.partitioningFields = partitioningFields;
+ this.domain = domain;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
+ }
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
+ this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
new file mode 100644
index 0000000..f56a5dc
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
+
+ private List<OrderColumn> partitioningFields;
+ private INodeDomain domain;
+ private IRangeMap rangeMap;
+
+ public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+ this.partitioningFields = partitioningFields;
+ this.domain = domain;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+ }
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
+ for (OrderColumn oc : partitioningFields) {
+ varList.add(oc.getColumn());
+ }
+ IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
+ List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+ for (ILocalStructuralProperty prop : op2Locals) {
+ if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+ locals.add(prop);
+ } else {
+ break;
+ }
+ }
+
+ this.deliveredProperties = new StructuralPropertiesVector(p, locals);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
+ List<OrderColumn> columns = new ArrayList<OrderColumn>();
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ columns.add(new OrderColumn(var, oc.getOrder()));
+ }
+ orderProps.add(new LocalOrderProperty(columns));
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+ orderProps) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
deleted file mode 100644
index 1f70f2a..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-
-public class RangePartitionMergePOperator extends AbstractExchangePOperator {
-
- private List<OrderColumn> partitioningFields;
- private INodeDomain domain;
- private IRangeMap rangeMap;
-
- public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
- this.partitioningFields = partitioningFields;
- this.domain = domain;
- this.rangeMap = rangeMap;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
- }
-
- public List<OrderColumn> getPartitioningFields() {
- return partitioningFields;
- }
-
- public INodeDomain getDomain() {
- return domain;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
- for (OrderColumn oc : partitioningFields) {
- varList.add(oc.getColumn());
- }
- IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
- List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
- for (ILocalStructuralProperty prop : op2Locals) {
- if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
- locals.add(prop);
- } else {
- break;
- }
- }
-
- this.deliveredProperties = new StructuralPropertiesVector(p, locals);
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
- List<OrderColumn> columns = new ArrayList<OrderColumn>();
- for (OrderColumn oc : partitioningFields) {
- LogicalVariable var = oc.getColumn();
- columns.add(new OrderColumn(var, oc.getOrder()));
- }
- orderProps.add(new LocalOrderProperty(columns));
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
- orderProps) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- int n = partitioningFields.size();
- int[] sortFields = new int[n];
- IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-
- INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
- INormalizedKeyComputerFactory nkcf = null;
-
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- int i = 0;
- for (OrderColumn oc : partitioningFields) {
- LogicalVariable var = oc.getColumn();
- sortFields[i] = opSchema.findVariable(var);
- Object type = env.getVarType(var);
- OrderKind order = oc.getOrder();
- if (i == 0 && nkcfProvider != null && type != null) {
- nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
- }
- IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
- comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
- i++;
- }
- ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
- IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
- }
-
- @Override
- public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
deleted file mode 100644
index 9046ce2..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-
-public class RangePartitionPOperator extends AbstractExchangePOperator {
-
- private List<OrderColumn> partitioningFields;
- private INodeDomain domain;
- private IRangeMap rangeMap;
-
- public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
- this.partitioningFields = partitioningFields;
- this.domain = domain;
- this.rangeMap = rangeMap;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
- }
-
- public List<OrderColumn> getPartitioningFields() {
- return partitioningFields;
- }
-
- public INodeDomain getDomain() {
- return domain;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
- this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- int n = partitioningFields.size();
- int[] sortFields = new int[n];
- IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-
- INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
- INormalizedKeyComputerFactory nkcf = null;
-
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- int i = 0;
- for (OrderColumn oc : partitioningFields) {
- LogicalVariable var = oc.getColumn();
- sortFields[i] = opSchema.findVariable(var);
- Object type = env.getVarType(var);
- OrderKind order = oc.getOrder();
- if (i == 0 && nkcfProvider != null && type != null) {
- nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
- }
- IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
- comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
- i++;
- }
- ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
- IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
- }
-
- @Override
- public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/61ec1289/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 53f7dbf..8edf0ba 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -51,7 +51,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
@@ -59,9 +59,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemorySta
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -540,7 +540,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
.get(OperatorAnnotations.USE_RANGE_CONNECTOR);
- pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
+ pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
} else {
OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
sortColumns = ordCols.toArray(sortColumns);
@@ -573,18 +573,18 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
break;
}
case ORDERED_PARTITIONED: {
- pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain,
+ pop = new RangePartitionExchangePOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain,
null);
break;
}
case BROADCAST: {
- pop = new BroadcastPOperator(domain);
+ pop = new BroadcastExchangePOperator(domain);
break;
}
case RANDOM: {
RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
INodeDomain nd = rpp.getNodeDomain();
- pop = new RandomPartitionPOperator(nd);
+ pop = new RandomPartitionExchangePOperator(nd);
break;
}
default: {