You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/10 01:43:01 UTC
[2/2] git commit: DRILL-576: Add costing - new plans for joins and
aggregations,
including distributions. - Utilize GroupScan getSize() for costing - Add
cleanup() methods to MergeJoinBatch and HashJoinBatch. - Don't match hash
aggr rule if number of gro
DRILL-576: Add costing
- new plans for joins and aggregations, including distributions.
- Utilize GroupScan getSize() for costing
- Add cleanup() methods to MergeJoinBatch and HashJoinBatch.
- Don't match hash aggr rule if number of grouping cols is 0.
- Fix initialization of maxOccupiedIndex in HashAggr and HashTable.
- Fix less-than comparison for cost when row counts are the same.
- Improve fragment identification for better debugging.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3fa1322b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3fa1322b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3fa1322b
Branch: refs/heads/master
Commit: 3fa1322b31605de1e47e70912ad5aadf598c4e99
Parents: b7bf00c
Author: Aman Sinha <as...@mapr.com>
Authored: Fri Apr 4 11:56:44 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 9 16:36:18 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ops/QueryContext.java | 1 +
.../drill/exec/physical/base/Exchange.java | 9 -
.../exec/physical/config/BroadcastExchange.java | 5 -
.../physical/config/HashToMergeExchange.java | 93 ++++++
.../physical/config/HashToRandomExchange.java | 5 -
.../config/OrderedPartitionExchange.java | 10 -
.../physical/config/SingleMergeExchange.java | 7 +-
.../exec/physical/config/UnionExchange.java | 6 -
.../physical/impl/aggregate/HashAggBatch.java | 6 +-
.../impl/aggregate/HashAggTemplate.java | 9 +-
.../physical/impl/common/HashTableTemplate.java | 2 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 11 +-
.../exec/planner/common/DrillFilterRelBase.java | 23 +-
.../exec/planner/common/DrillLimitRelBase.java | 20 ++
.../planner/common/DrillProjectRelBase.java | 14 +-
.../exec/planner/common/DrillScanRelBase.java | 5 +-
.../exec/planner/common/DrillScreenRelBase.java | 12 +-
.../drill/exec/planner/cost/DrillCostBase.java | 311 +++++++++++++++++++
.../exec/planner/cost/DrillRelOptCost.java | 28 ++
.../planner/cost/DrillRelOptCostFactory.java | 31 ++
.../exec/planner/logical/DrillAggregateRel.java | 5 -
.../planner/logical/DrillAggregateRule.java | 6 +
.../exec/planner/logical/DrillRuleSets.java | 15 +-
.../exec/planner/logical/DrillScanRel.java | 43 ++-
.../exec/planner/physical/AggPruleBase.java | 57 ++++
.../planner/physical/BroadcastExchangePrel.java | 83 +++++
.../physical/DrillDistributionTrait.java | 5 +-
.../physical/DrillDistributionTraitDef.java | 17 +-
.../drill/exec/planner/physical/FilterPrel.java | 8 +-
.../exec/planner/physical/HashAggPrel.java | 127 ++++++++
.../exec/planner/physical/HashAggPrule.java | 93 ++++++
.../exec/planner/physical/HashJoinPrel.java | 166 ++++++++++
.../exec/planner/physical/HashJoinPrule.java | 92 ++++++
.../physical/HashToMergeExchangePrel.java | 106 +++++++
.../physical/HashToRandomExchangePrel.java | 35 ++-
.../exec/planner/physical/JoinPruleBase.java | 66 ++++
.../exec/planner/physical/MergeJoinPrel.java | 21 +-
.../exec/planner/physical/MergeJoinPrule.java | 75 ++---
.../physical/OrderedPartitionExchangePrel.java | 18 +-
.../exec/planner/physical/PlannerSettings.java | 18 ++
.../drill/exec/planner/physical/ScanPrel.java | 53 +++-
.../physical/SingleMergeExchangePrel.java | 37 ++-
.../drill/exec/planner/physical/SortPrel.java | 27 +-
.../exec/planner/physical/StreamAggPrel.java | 26 +-
.../exec/planner/physical/StreamAggPrule.java | 103 +++---
.../drill/exec/planner/physical/TopNPrel.java | 23 +-
.../planner/physical/UnionExchangePrel.java | 27 +-
.../drill/exec/planner/sql/DrillSqlWorker.java | 14 +
.../org/apache/drill/exec/work/WorkManager.java | 3 +-
.../apache/drill/exec/work/foreman/Foreman.java | 7 +
.../exec/work/fragment/FragmentExecutor.java | 9 +
.../org/apache/drill/TestAltSortQueries.java | 2 +
.../org/apache/drill/TestTpchDistributed.java | 1 +
pom.xml | 2 +-
54 files changed, 1792 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index e3d2f54..e541200 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -57,6 +57,7 @@ public class QueryContext{
this.session = session;
this.timer = new Multitimer<>(QuerySetup.class);
this.plannerSettings = new PlannerSettings(session.getOptions());
+ this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size());
}
public PlannerSettings getPlannerSettings(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index 80cf5a2..2d2372a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -91,13 +91,4 @@ public interface Exchange extends PhysicalOperator {
*/
public PhysicalOperator getChild();
-
-
- /**
- * Informs the planner whether or not this particular exchange supports an incoming stream that has an attached selection vector.
- * @param mode
- * @return
- */
- @JsonIgnore
- public boolean supportsSelectionVector();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index 256d3d9..95d8803 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -71,9 +71,4 @@ public class BroadcastExchange extends AbstractExchange {
public int getMaxSendWidth() {
return Integer.MAX_VALUE;
}
-
- @Override
- public boolean supportsSelectionVector() {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
new file mode 100644
index 0000000..8bae26f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("hash-to-merge-exchange")
+public class HashToMergeExchange extends AbstractExchange{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class);
+
+
+ private final LogicalExpression distExpr;
+ private final List<Ordering> orderExprs;
+
+ //ephemeral for setup tasks.
+ private List<DrillbitEndpoint> senderLocations;
+ private List<DrillbitEndpoint> receiverLocations;
+
+ @JsonCreator
+ public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("expr") LogicalExpression expr,
+ @JsonProperty("orderings") List<Ordering> orderExprs) {
+ super(child);
+ this.distExpr = expr;
+ this.orderExprs = orderExprs;
+ }
+
+ @Override
+ public int getMaxSendWidth() {
+ return Integer.MAX_VALUE;
+ }
+
+
+ @Override
+ protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
+ this.senderLocations = senderLocations;
+ }
+
+ @Override
+ protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
+ this.receiverLocations = receiverLocations;
+ }
+
+ @Override
+ public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+ return new HashPartitionSender(receiverMajorFragmentId, child, distExpr, receiverLocations);
+ }
+
+ @Override
+ public Receiver getReceiver(int minorFragmentId) {
+ return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, orderExprs);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new HashToMergeExchange(child, distExpr, orderExprs);
+ }
+
+ @JsonProperty("orderExpr")
+ public List<Ordering> getOrderExpressions(){
+ return orderExprs;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index 61c2f1a..7ad2b65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -79,11 +79,6 @@ public class HashToRandomExchange extends AbstractExchange{
return new HashToRandomExchange(child, expr);
}
- @Override
- public boolean supportsSelectionVector() {
- return true;
- }
-
@JsonProperty("expr")
public LogicalExpression getExpression(){
return expr;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index c49509f..2b80262 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -100,14 +100,4 @@ public class OrderedPartitionExchange extends AbstractExchange {
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new OrderedPartitionExchange(orderings, ref, child, recordsToSample, samplingFactor, completionFactor);
}
-
- @Override
- public boolean supportsSelectionVector() {
- return true;
- }
-
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index df13a84..26d881d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -84,14 +84,9 @@ public class SingleMergeExchange extends AbstractExchange {
return new SingleMergeExchange(child, orderExpr);
}
- @Override
- public boolean supportsSelectionVector() {
- return true;
- }
-
@JsonProperty("orderings")
public List<Ordering> getOrderings() {
return this.orderExpr;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index 78270d8..c295ac3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -75,10 +75,4 @@ public class UnionExchange extends AbstractExchange{
return new UnionExchange(child);
}
- @Override
- public boolean supportsSelectionVector() {
- return false;
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index ee5cfa8..aa6cd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -137,7 +137,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
case CLEANUP_AND_RETURN:
container.clear();
aggregator.cleanup();
- incoming.cleanup();
done = true;
return aggregator.getOutcome();
case RETURN_OUTCOME:
@@ -277,6 +276,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ incoming.cleanup();
+ }
@Override
protected void killIncoming() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index c50a86a..5f26054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -95,7 +95,7 @@ public abstract class HashAggTemplate implements HashAggregator {
public class BatchHolder {
private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
- int maxOccupiedIdx = 0;
+ int maxOccupiedIdx = -1;
private BatchHolder() {
@@ -288,6 +288,13 @@ public abstract class HashAggTemplate implements HashAggregator {
}
private void allocateOutgoing() {
+
+ // At present, since we output all records at once, we create the outgoing batch
+ // with a size of numGroupedRecords..however this has to be restricted to max of 64K right
+ // now otherwise downstream operators will break.
+ // TODO: allow outputting arbitrarily large number of records in batches
+ assert (numGroupedRecords < Character.MAX_VALUE);
+
for (VectorAllocator a : keyAllocators) {
if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords);
a.alloc(numGroupedRecords);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 23a0cf5..402e395 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -113,7 +113,7 @@ public abstract class HashTableTemplate implements HashTable {
// Array of hash values - this is useful when resizing the hash table
private IntVector hashValues;
- int maxOccupiedIdx = 0;
+ int maxOccupiedIdx = -1;
private BatchHolder(int idx) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index db90085..9a055bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -153,8 +153,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
status.resetOutputPos();
if (outcome == JoinOutcome.NO_MORE_DATA) {
- left.cleanup();
- right.cleanup();
logger.debug("NO MORE DATA; returning {} NONE");
return IterOutcome.NONE;
}
@@ -186,8 +184,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
kill();
return IterOutcome.STOP;
case NO_MORE_DATA:
- left.cleanup();
- right.cleanup();
logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
case SCHEMA_CHANGED:
@@ -222,6 +218,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
right.kill();
}
+ @Override
+ public void cleanup() {
+ super.cleanup();
+
+ left.cleanup();
+ right.cleanup();
+ }
private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
index 955729b..865c0fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
@@ -17,21 +17,28 @@
*/
package org.apache.drill.exec.planner.common;
+import java.util.List;
+
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.logical.DrillImplementor;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.eigenbase.rel.FilterRelBase;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptCluster;
import org.eigenbase.relopt.RelOptCost;
import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.relopt.RelTraitSet;
import org.eigenbase.rex.RexNode;
@@ -39,14 +46,28 @@ import org.eigenbase.rex.RexNode;
* Base class for logical and physical Filters implemented in Drill
*/
public abstract class DrillFilterRelBase extends FilterRelBase implements DrillRelNode {
+ int numConjuncts = 0;
+
protected DrillFilterRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
super(cluster, traits, child, condition);
assert getConvention() == convention;
+
+ // save the number of conjuncts that make up the filter condition such
+ // that repeated calls to the costing function can use the saved copy
+ numConjuncts = RelOptUtil.conjunctions(condition).size();
+ assert numConjuncts >= 1;
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return super.computeSelfCost(planner).multiplyBy(0.1);
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+ RelNode child = this.getChild();
+ double inputRows = RelMetadataQuery.getRowCount(child);
+ double cpuCost = DrillCostBase.COMPARE_CPU_COST * numConjuncts * inputRows;
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(inputRows, cpuCost, 0, 0);
}
protected LogicalExpression getFilterExpression(DrillParseContext context){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
index b62eb9e..ab6bc02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
@@ -22,14 +22,20 @@ import java.util.List;
import org.apache.drill.common.logical.data.Limit;
import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.logical.DrillImplementor;
import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.RelWriter;
import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitSet;
import org.eigenbase.rex.RexLiteral;
import org.eigenbase.rex.RexNode;
@@ -56,6 +62,20 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
return this.fetch;
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
+ int f = fetch != null ? RexLiteral.intValue(fetch) : 0 ;
+ double numRows = off + f;
+ double cpuCost = DrillCostBase.COMPARE_CPU_COST * numRows;
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(numRows, cpuCost, 0, 0);
+ }
+
public RelWriter explainTerms(RelWriter pw) {
super.explainTerms(pw);
pw.itemIf("offset", offset, offset != null);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
index cf3d188..4629737 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
@@ -22,10 +22,14 @@ import java.util.List;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.eigenbase.rel.ProjectRelBase;
import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptCluster;
import org.eigenbase.relopt.RelOptCost;
@@ -50,7 +54,15 @@ public abstract class DrillProjectRelBase extends ProjectRelBase implements Dril
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return super.computeSelfCost(planner).multiplyBy(0.1);
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ // cost is proportional to the number of rows and number of columns being projected
+ double rowCount = RelMetadataQuery.getRowCount(this);
+ double cpuCost = DrillCostBase.PROJECT_CPU_COST * getRowType().getFieldCount();
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(rowCount, cpuCost, 0, 0);
}
private List<Pair<RexNode, String>> projects() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index 88df658..331069d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -25,7 +25,9 @@ import org.eigenbase.relopt.RelOptTable;
import org.eigenbase.relopt.RelTraitSet;
/**
- * Base class for logical and physical Scans implemented in Drill
+ * Base class for logical scan rel implemented in Drill.
+ * NOTE: we should eventually make this class independent of TableAccessRelBase and then
+ * make it the base class for logical and physical scan rels.
*/
public abstract class DrillScanRelBase extends TableAccessRelBase implements DrillRelNode {
protected final DrillTable drillTable;
@@ -35,4 +37,5 @@ public abstract class DrillScanRelBase extends TableAccessRelBase implements Dri
this.drillTable = table.unwrap(DrillTable.class);
assert drillTable != null;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
index 51ed442..23b53f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
@@ -17,8 +17,12 @@
*/
package org.apache.drill.exec.planner.common;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptCluster;
import org.eigenbase.relopt.RelOptCost;
@@ -37,7 +41,13 @@ public abstract class DrillScreenRelBase extends SingleRel implements DrillRelNo
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return super.computeSelfCost(planner).multiplyBy(.1);
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+ // by default, assume cost is proportional to number of rows
+ double rowCount = RelMetadataQuery.getRowCount(this);
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(rowCount, rowCount, 0, 0).multiplyBy(0.1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
new file mode 100644
index 0000000..412c218
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
@@ -0,0 +1,311 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.util.Util;
+
+/**
+ * Implementation of the DrillRelOptCost, modeled similar to VolcanoCost
+ */
+public class DrillCostBase implements DrillRelOptCost {
+
+ /**
+ * NOTE: the multiplication factors below are not calibrated yet...these
+ * are chosen based on approximations for now. For reference purposes,
+ * assume each disk on a server can have a sustained I/O throughput of
+ * 100 MBytes/sec. Suppose there is an array of 16 disks per server..theoretically
+ * one could get 1.6GBytes/sec. Suppose network speed is 1GBit/sec which is
+ * 128MBytes/sec, although actual transfer rate over the network may be lower.
+ * We are only concerned with relative costs, not absolute values.
+ * For relative costing, let's assume sending data over the network is
+ * about 16x slower than reading/writing to an array of local disks.
+ */
+ public static final int BASE_CPU_COST = 1; // base cpu cost per 'operation'
+ public static final int BYTE_DISK_READ_COST = 32 * BASE_CPU_COST; // disk read cost per byte
+ public static final int BYTE_NETWORK_COST = 16 * BYTE_DISK_READ_COST; // network transfer cost per byte
+
+
+ public static final int SVR_CPU_COST = 8 * BASE_CPU_COST; // cpu cost for SV remover
+ public static final int FUNC_CPU_COST = 12 * BASE_CPU_COST; // cpu cost for a function evaluation
+
+ // cpu cost for projecting an expression; note that projecting an expression
+ // that is not a simple column or constant may include evaluation, but we
+ // currently don't model it at that level of detail.
+ public static final int PROJECT_CPU_COST = 4 * BASE_CPU_COST;
+
+ // hash cpu cost per field (for now we don't distinguish between fields of different types) involves
+ // the cost of the following operations:
+ // compute hash value, probe hash table, walk hash chain and compare with each element,
+ // add to the end of hash chain if no match found
+ public static final int HASH_CPU_COST = 8 * BASE_CPU_COST;
+
+ public static final int RANGE_PARTITION_CPU_COST = 12 * BASE_CPU_COST;
+
+ // cost of comparing one field with another (ignoring data types for now)
+ public static final int COMPARE_CPU_COST = 4 * BASE_CPU_COST;
+ public static final int AVG_FIELD_WIDTH = 8;
+
+ /** For the costing formulas in computeSelfCost(), assume the following notations:
+ * Let
+ * C = Cost per node.
+ * k = number of fields on which to distribute on
+ * h = CPU cost of computing hash value on 1 field
+ * s = CPU cost of Selection-Vector remover per row
+ * w = Network cost of sending 1 row to 1 destination
+ * c = CPU cost of comparing an incoming row with one on a heap of size N
+ */
+
+ static final DrillCostBase INFINITY =
+ new DrillCostBase(
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY) {
+ public String toString() {
+ return "{inf}";
+ }
+ };
+
+ static final DrillCostBase HUGE =
+ new DrillCostBase(Double.MAX_VALUE,
+ Double.MAX_VALUE,
+ Double.MAX_VALUE,
+ Double.MAX_VALUE) {
+ public String toString() {
+ return "{huge}";
+ }
+ };
+
+ static final DrillCostBase ZERO =
+ new DrillCostBase(0.0, 0.0, 0.0, 0.0) {
+ public String toString() {
+ return "{0}";
+ }
+ };
+
+ static final DrillCostBase TINY =
+ new DrillCostBase(1.0, 1.0, 0.0, 0.0) {
+ public String toString() {
+ return "{tiny}";
+ }
+ };
+
+ final double rowCount;
+ final double cpu;
+ final double io;
+ final double network;
+
+ public DrillCostBase(double rowCount, double cpu, double io, double network) {
+ this.rowCount = rowCount;
+ this.cpu = cpu;
+ this.io = io;
+ this.network = network;
+ }
+
+ @Override
+ public double getRows() {
+ return rowCount;
+ }
+
+ @Override
+ public double getCpu() {
+ return cpu;
+ }
+
+ @Override
+ public double getIo() {
+ return io;
+ }
+
+ @Override
+ public double getNetwork() {
+ return network;
+ }
+
+ @Override
+ public int hashCode() {
+ return Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io) + Util.hashCode(network);
+ }
+
+ @Override
+ public boolean isInfinite() {
+ return (this == INFINITY)
+ || (this.cpu == Double.POSITIVE_INFINITY)
+ || (this.io == Double.POSITIVE_INFINITY)
+ || (this.network == Double.POSITIVE_INFINITY)
+ || (this.rowCount == Double.POSITIVE_INFINITY);
+ }
+
+ @Override
+ public boolean equals(RelOptCost other) {
+ // here we compare the individual components similar to VolcanoCost, however
+ // an alternative would be to add up the components and compare the total.
+ // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons,
+ // not equals().
+ return this == other
+ || (other instanceof DrillCostBase
+ && (this.cpu == ((DrillCostBase) other).cpu)
+ && (this.io == ((DrillCostBase) other).io)
+ && (this.network == ((DrillCostBase) other).network)
+ && (this.rowCount == ((DrillCostBase) other).rowCount));
+ }
+
+ @Override
+ public boolean isEqWithEpsilon(RelOptCost other) {
+ if (!(other instanceof DrillCostBase)) {
+ return false;
+ }
+ DrillCostBase that = (DrillCostBase) other;
+ return (this == that)
+ || ((Math.abs(this.cpu - that.cpu) < RelOptUtil.EPSILON)
+ && (Math.abs(this.io - that.io) < RelOptUtil.EPSILON)
+ && (Math.abs(this.network - that.network) < RelOptUtil.EPSILON)
+ && (Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON));
+ }
+
+ @Override
+ public boolean isLe(RelOptCost other) {
+ DrillCostBase that = (DrillCostBase) other;
+
+ return this == that
+ || ( (this.cpu + this.io + this.network) <=
+ (that.cpu + that.io + that.network)
+ && this.rowCount <= that.rowCount
+ );
+ }
+
+ @Override
+ public boolean isLt(RelOptCost other) {
+ DrillCostBase that = (DrillCostBase) other;
+
+ return ( (this.cpu + this.io + this.network) <
+ (that.cpu + that.io + that.network)
+ && this.rowCount <= that.rowCount
+ );
+ }
+
+ @Override
+ public RelOptCost plus(RelOptCost other) {
+ DrillCostBase that = (DrillCostBase) other;
+ if ((this == INFINITY) || (that == INFINITY)) {
+ return INFINITY;
+ }
+ return new DrillCostBase(
+ this.rowCount + that.rowCount,
+ this.cpu + that.cpu,
+ this.io + that.io,
+ this.network + that.network);
+ }
+
+ @Override
+ public RelOptCost minus(RelOptCost other) {
+ if (this == INFINITY) {
+ return this;
+ }
+ DrillCostBase that = (DrillCostBase) other;
+ return new DrillCostBase(
+ this.rowCount - that.rowCount,
+ this.cpu - that.cpu,
+ this.io - that.io,
+ this.network - that.network);
+ }
+
+ @Override
+ public RelOptCost multiplyBy(double factor) {
+ if (this == INFINITY) {
+ return this;
+ }
+ return new DrillCostBase(rowCount * factor, cpu * factor, io * factor, network * factor);
+ }
+
+ @Override
+ public double divideBy(RelOptCost cost) {
+ // Compute the geometric average of the ratios of all of the factors
+ // which are non-zero and finite.
+ DrillCostBase that = (DrillCostBase) cost;
+ double d = 1;
+ double n = 0;
+ if ((this.rowCount != 0)
+ && !Double.isInfinite(this.rowCount)
+ && (that.rowCount != 0)
+ && !Double.isInfinite(that.rowCount)) {
+ d *= this.rowCount / that.rowCount;
+ ++n;
+ }
+ if ((this.cpu != 0)
+ && !Double.isInfinite(this.cpu)
+ && (that.cpu != 0)
+ && !Double.isInfinite(that.cpu)) {
+ d *= this.cpu / that.cpu;
+ ++n;
+ }
+ if ((this.io != 0)
+ && !Double.isInfinite(this.io)
+ && (that.io != 0)
+ && !Double.isInfinite(that.io)) {
+ d *= this.io / that.io;
+ ++n;
+ }
+ if ((this.network != 0)
+ && !Double.isInfinite(this.network)
+ && (that.network != 0)
+ && !Double.isInfinite(that.network)) {
+ d *= this.network / that.network;
+ ++n;
+ }
+
+ if (n == 0) {
+ return 1.0;
+ }
+ return Math.pow(d, 1 / n);
+ }
+
+ public String toString() {
+ return "{" + rowCount + " rows, " + cpu + " cpu, " + io + " io, " + network + " network}";
+ }
+
+ public static class DrillCostFactory implements DrillRelOptCostFactory {
+ public RelOptCost makeCost(double dRows, double dCpu, double dIo, double dNetwork) {
+ return new DrillCostBase(dRows, dCpu, dIo, dNetwork);
+ }
+
+ public RelOptCost makeCost(double dRows, double dCpu, double dIo) {
+ return new DrillCostBase(dRows, dCpu, dIo, 0);
+ }
+
+ public RelOptCost makeHugeCost() {
+ return DrillCostBase.HUGE;
+ }
+
+ public RelOptCost makeInfiniteCost() {
+ return DrillCostBase.INFINITY;
+ }
+
+ public RelOptCost makeTinyCost() {
+ return DrillCostBase.TINY;
+ }
+
+ public RelOptCost makeZeroCost() {
+ return DrillCostBase.ZERO;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
new file mode 100644
index 0000000..8f20658
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+
+public interface DrillRelOptCost extends RelOptCost {
+
+ double getNetwork();
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
new file mode 100644
index 0000000..fc20d60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptCostFactory;
+
+public interface DrillRelOptCostFactory extends RelOptCostFactory {
+
+ /**
+ * Creates a cost object.
+ */
+ RelOptCost makeCost(double rowCount, double cpu, double io, double network);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index c2833c1..fe5130c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -49,11 +49,6 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel
public DrillAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
List<AggregateCall> aggCalls) throws InvalidRelException {
super(cluster, traits, child, groupSet, aggCalls);
- for (AggregateCall aggCall : aggCalls) {
- if (aggCall.isDistinct()) {
- throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
- }
- }
}
public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
index 273237a..5d65e7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
@@ -41,6 +41,12 @@ public class DrillAggregateRule extends RelOptRule {
public void onMatch(RelOptRuleCall call) {
final AggregateRel aggregate = (AggregateRel) call.rel(0);
final RelNode input = call.rel(1);
+
+ if (aggregate.containsDistinctCall()) {
+ // currently, don't use this rule if any of the aggregates contains DISTINCT
+ return;
+ }
+
final RelTraitSet traits = aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelNode convertedInput = convert(input, traits);
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 4defacd..0693d22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -22,6 +22,17 @@ import java.util.Iterator;
import net.hydromatic.optiq.tools.RuleSet;
import org.apache.drill.exec.planner.physical.*;
+import org.apache.drill.exec.planner.physical.FilterPrule;
+import org.apache.drill.exec.planner.physical.HashAggPrule;
+import org.apache.drill.exec.planner.physical.HashJoinPrule;
+import org.apache.drill.exec.planner.physical.LimitPrule;
+import org.apache.drill.exec.planner.physical.MergeJoinPrule;
+import org.apache.drill.exec.planner.physical.ProjectPrule;
+import org.apache.drill.exec.planner.physical.ScanPrule;
+import org.apache.drill.exec.planner.physical.ScreenPrule;
+import org.apache.drill.exec.planner.physical.SortConvertPrule;
+import org.apache.drill.exec.planner.physical.SortPrule;
+import org.apache.drill.exec.planner.physical.StreamAggPrule;
import org.eigenbase.rel.RelFactories;
import org.eigenbase.rel.rules.MergeProjectRule;
import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -49,7 +60,7 @@ public class DrillRuleSets {
PushFilterPastJoinRule.FILTER_ON_JOIN,
PushJoinThroughJoinRule.RIGHT,
PushJoinThroughJoinRule.LEFT,
- // End supprot for WHERE style joins.
+ // End support for WHERE style joins.
//Add back rules
@@ -106,7 +117,9 @@ public class DrillRuleSets {
ScreenPrule.INSTANCE,
ExpandConversionRule.INSTANCE,
StreamAggPrule.INSTANCE,
+ HashAggPrule.INSTANCE,
MergeJoinPrule.INSTANCE,
+ HashJoinPrule.INSTANCE,
FilterPrule.INSTANCE,
LimitPrule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 46394a9..ae11564 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.eigenbase.relopt.RelOptCluster;
@@ -90,21 +91,39 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
return this.rowType;
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- OperatorCost scanCost = groupScan.getCost();
- Size scanSize = groupScan.getSize();
- int columnCount = this.getRowType().getFieldCount();
- // FIXME: Use the new cost model
- return this
- .getCluster()
- .getPlanner()
- .getCostFactory()
- .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(),
- scanCost.getNetwork() * scanCost.getDisk());
+ @Override
+ public double getRows() {
+ return this.groupScan.getSize().getRecordCount();
}
+ /// TODO: this method is same as the one for ScanPrel...eventually we should consolidate
+ /// this and few other methods in a common base class which would be extended
+ /// by both logical and physical rels.
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ Size scanSize = this.groupScan.getSize();
+ int columnCount = this.getRowType().getFieldCount();
+
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ OperatorCost scanCost = this.groupScan.getCost();
+ return planner.getCostFactory().makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), scanCost.getDisk());
+ }
+
+ // double rowCount = RelMetadataQuery.getRowCount(this);
+ double rowCount = scanSize.getRecordCount();
+
+ double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count.
+ // Even though scan is reading from disk, in the currently generated plans all plans will
+ // need to read the same amount of data, so keeping the disk io cost 0 is ok for now.
+ // In the future we might consider alternative scans that go against projections or
+ // different compression schemes etc that affect the amount of data read. Such alternatives
+ // would affect both cpu and io cost.
+ double ioCost = 0;
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);
+ }
+
public GroupScan getGroupScan() {
return groupScan;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
new file mode 100644
index 0000000..235018d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleOperand;
+
+import com.google.common.collect.Lists;
+
+// abstract base class for the aggregation physical rules
+public abstract class AggPruleBase extends RelOptRule {
+
+ protected AggPruleBase(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ protected List<DistributionField> getDistributionField(DrillAggregateRel rel, boolean allFields) {
+ List<DistributionField> groupByFields = Lists.newArrayList();
+
+ for (int group : BitSets.toIter(rel.getGroupSet())) {
+ DistributionField field = new DistributionField(group);
+ groupByFields.add(field);
+
+ if (!allFields && groupByFields.size() == 1) {
+ // if we are only interested in 1 grouping field, pick the first one for now..
+ // but once we have num distinct values (NDV) statistics, we should pick the one
+ // with highest NDV.
+ break;
+ }
+ }
+
+ return groupByFields;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
new file mode 100644
index 0000000..292c1ba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class BroadcastExchangePrel extends SingleRel implements Prel {
+
+ public BroadcastExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
+ super(cluster, traitSet, input);
+ assert input.getConvention() == Prel.DRILL_PHYSICAL;
+ }
+
+ /**
+ * In a BroadcastExchange, each sender is sending data to N receivers (for costing
+ * purposes we assume it is also sending to itself).
+ */
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ RelNode child = this.getChild();
+
+ double inputRows = RelMetadataQuery.getRowCount(child);
+ int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
+ double cpuCost = DrillCostBase.SVR_CPU_COST * inputRows ;
+ int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
+ double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+ return new DrillCostBase(inputRows, cpuCost, 0, networkCost);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new BroadcastExchangePrel(getCluster(), traitSet, sole(inputs));
+ }
+
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ Prel child = (Prel) this.getChild();
+
+ PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+ //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
+ if (!childPOP.getSVMode().equals(SelectionVectorMode.NONE)) {
+ childPOP = new SelectionVectorRemover(childPOP);
+ }
+
+ BroadcastExchange g = new BroadcastExchange(childPOP);
+ return g;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index b75fb40..abd50d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.planner.physical;
-import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTrait;
import org.eigenbase.relopt.RelTraitDef;
@@ -36,8 +35,8 @@ public class DrillDistributionTrait implements RelTrait {
private DistributionType type;
private final ImmutableList<DistributionField> fields;
-
- private DrillDistributionTrait(DistributionType type) {
+
+ public DrillDistributionTrait(DistributionType type) {
assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
|| type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
this.type = type;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index c2ebb7a..06c2319 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -17,9 +17,6 @@
*/
package org.apache.drill.exec.planner.physical;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationImpl;
-import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitDef;
@@ -47,7 +44,7 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
public String getSimpleName() {
return this.getClass().getSimpleName();
}
-
+
// implement RelTraitDef
public RelNode convert(
RelOptPlanner planner,
@@ -66,19 +63,21 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
// We do not want to convert from "ANY", since it's abstract.
// Source trait should be concrete type: SINGLETON, HASH_DISTRIBUTED, etc.
if (currentDist.equals(DrillDistributionTrait.DEFAULT)) {
- return null;
+ return null;
}
- RelCollation collation = null;
-
switch(toDist.getType()){
- // UnionExchange, HashToRandomExchange, OrderedPartitionExchange destroy the ordering property, therefore RelCollation is set to default, which is EMPTY.
+ // UnionExchange, HashToRandomExchange, OrderedPartitionExchange and BroadcastExchange destroy the ordering property,
+ // therefore RelCollation is set to default, which is EMPTY.
case SINGLETON:
return new UnionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
case HASH_DISTRIBUTED:
- return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields());
+ return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
+ toDist.getFields());
case RANGE_DISTRIBUTED:
return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
+ case BROADCAST_DISTRIBUTED:
+ return new BroadcastExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 0fc3abd..99a0cdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -24,9 +24,14 @@ import java.util.List;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitSet;
import org.eigenbase.rex.RexNode;
@@ -40,9 +45,10 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new FilterPrel(getCluster(), traitSet, sole(inputs), getCondition());
}
-
+
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+
Prel child = (Prel) this.getChild();
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
new file mode 100644
index 0000000..066bed0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.beust.jcommander.internal.Lists;
+
+public class HashAggPrel extends AggregateRelBase implements Prel{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
+
+ public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+ List<AggregateCall> aggCalls) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls);
+ }
+
+ public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
+ try {
+ return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+ RelNode child = this.getChild();
+ double inputRows = RelMetadataQuery.getRowCount(child);
+
+ int numGroupByFields = this.getGroupCount();
+ int numAggrFields = this.aggCalls.size();
+ // cpu cost of hashing each grouping key
+ double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
+ // add cpu cost for computing the aggregate functions
+ cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
+ double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */);
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ final List<String> fields = getRowType().getFieldNames();
+ List<NamedExpression> keys = Lists.newArrayList();
+ List<NamedExpression> exprs = Lists.newArrayList();
+
+ for (int group : BitSets.toIter(groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ keys.add(new NamedExpression(fr, fr));
+ }
+
+ for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+ FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
+ LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+ exprs.add(new NamedExpression(expr, ref));
+ }
+
+ Prel child = (Prel) this.getChild();
+ HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
+ keys.toArray(new NamedExpression[keys.size()]),
+ exprs.toArray(new NamedExpression[exprs.size()]),
+ 1.0f);
+
+ return g;
+
+ }
+
+ private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for(Integer i : call.getArgList()){
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+ LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+ return expr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
new file mode 100644
index 0000000..22b33ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.logging.Logger;
+
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import com.google.common.collect.ImmutableList;
+
+public class HashAggPrule extends AggPruleBase {
+ public static final RelOptRule INSTANCE = new HashAggPrule();
+ protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+ private HashAggPrule() {
+ super(RelOptHelper.some(DrillAggregateRel.class, RelOptHelper.any(DrillRel.class)), "Prel.HashAggPrule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
+ final RelNode input = call.rel(1);
+
+ if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
+ // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
+ // if there are no grouping keys
+ return;
+ }
+
+ DrillDistributionTrait toDist = null;
+ RelTraitSet traits = null;
+
+ try {
+ if (aggregate.getGroupSet().isEmpty()) {
+ toDist = DrillDistributionTrait.SINGLETON;
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ createTransformRequest(call, aggregate, input, traits);
+ } else {
+ // hash distribute on all grouping keys
+ toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ createTransformRequest(call, aggregate, input, traits);
+
+ // hash distribute on single grouping key
+ toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ createTransformRequest(call, aggregate, input, traits);
+
+ ///TODO: 2 phase hash aggregate plan
+ }
+ } catch (InvalidRelException e) {
+ tracer.warning(e.toString());
+ }
+ }
+
+ private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
+ RelNode input, RelTraitSet traits) throws InvalidRelException {
+
+ final RelNode convertedInput = convert(input, traits);
+
+ HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ call.transformTo(newAgg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
new file mode 100644
index 0000000..7c68c8d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.util.Pair;
+
+import com.beust.jcommander.internal.Lists;
+
+public class HashJoinPrel extends DrillJoinRelBase implements Prel {
+
+ public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType);
+
+ RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+ }
+
+
+ @Override
+ public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType) {
+ try {
+ return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType);
+ }catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+ double probeRowCount = RelMetadataQuery.getRowCount(this.getLeft());
+ double buildRowCount = RelMetadataQuery.getRowCount(this.getRight());
+
+ // cpu cost of hashing the join keys for the build side
+ double cpuCostBuild = DrillCostBase.HASH_CPU_COST * getRightKeys().size() * buildRowCount;
+ // cpu cost of hashing the join keys for the probe side
+ double cpuCostProbe = DrillCostBase.HASH_CPU_COST * getLeftKeys().size() * probeRowCount;
+
+ // cpu cost of evaluating each leftkey=rightkey join condition
+ double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
+
+ double cpuCost = joinConditionCost * (buildRowCount + probeRowCount) + cpuCostBuild + cpuCostProbe;
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0);
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ final List<String> fields = getRowType().getFieldNames();
+ assert isUnique(fields);
+ final int leftCount = left.getRowType().getFieldCount();
+ final List<String> leftFields = fields.subList(0, leftCount);
+ final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+ PhysicalOperator leftPop = implementInput(creator, 0, left);
+ PhysicalOperator rightPop = implementInput(creator, leftCount, right);
+
+ //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
+ leftPop = PrelUtil.removeSvIfRequired(leftPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
+
+ //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
+ rightPop = PrelUtil.removeSvIfRequired(rightPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
+
+ JoinRelType jtype = this.getJoinType();
+
+ List<JoinCondition> conditions = Lists.newArrayList();
+
+ for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+ conditions.add(new JoinCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right))));
+ }
+
+ HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
+
+ return hjoin;
+ }
+
+ public List<Integer> getLeftKeys() {
+ return this.leftKeys;
+ }
+
+ public List<Integer> getRightKeys() {
+ return this.rightKeys;
+ }
+
+ /**
+ * Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them.
+ * @param implementor
+ * @param i
+ * @param offset
+ * @param input
+ * @return
+ */
+ private PhysicalOperator implementInput(PhysicalPlanCreator creator, int offset, RelNode input) throws IOException {
+ final PhysicalOperator inputOp = ((Prel) input).getPhysicalOperator(creator);
+ assert uniqueFieldNames(input.getRowType());
+ final List<String> fields = getRowType().getFieldNames();
+ final List<String> inputFields = input.getRowType().getFieldNames();
+ final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+ if (!outputFields.equals(inputFields)) {
+ // Ensure that input field names are the same as output field names.
+ // If there are duplicate field names on left and right, fields will get
+ // lost.
+ return rename(creator, inputOp, inputFields, outputFields);
+ } else {
+ return inputOp;
+ }
+ }
+
+ private PhysicalOperator rename(PhysicalPlanCreator creator, PhysicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
+ List<NamedExpression> exprs = Lists.newArrayList();
+
+ //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover
+ inputOp = PrelUtil.removeSvIfRequired(inputOp, SelectionVectorMode.NONE);
+
+ for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
+ exprs.add(new NamedExpression(new FieldReference(pair.left), new FieldReference(pair.right)));
+ }
+
+ Project proj = new Project(exprs, inputOp);
+
+ return proj;
+ }
+
+
+}