You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2018/03/14 04:37:37 UTC
[6/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge
join decision on cost
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index a15ab35..21cbc2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -35,6 +35,10 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.aggregator.Aggregators;
@@ -90,25 +94,30 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
-
- if (byteCount == null) {
+ Double outputBytes = this.accept(new ByteCountVisitor());
+ Double inputRows = this.getDelegate().accept(new RowCountVisitor());
+ Double rowWidth = this.accept(new AvgRowWidthVisitor());
+ if (inputRows == null || outputBytes == null || rowWidth == null) {
return Cost.UNKNOWN;
}
+ double inputBytes = inputRows * rowWidth;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(
+ RowCountVisitor.filter(
+ inputRows.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ context.getScan().getFilter())),
+ groupBy);
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
int parallelLevel = CostUtil.estimateParallelLevel(
false, context.getConnection().getQueryServices());
- Cost cost = CostUtil.estimateAggregateCost(byteCount,
- groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+ Cost cost = CostUtil.estimateAggregateCost(
+ inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- double outputBytes = CostUtil.estimateAggregateOutputBytes(
- byteCount, groupBy, clientAggregators.getEstimatedByteSize());
- Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytesAfterHaving, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return super.getCost().plus(cost);
@@ -210,7 +219,16 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
public GroupBy getGroupBy() {
return groupBy;
}
-
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ public Expression getHaving() {
+ return having;
+ }
+
private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
private final List<Expression> groupByExpressions;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index ac43919..75ba8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -85,4 +85,8 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
public FilterableStatement getStatement() {
return statement;
}
+
+ public Expression getWhere() {
+ return where;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 5799990..3427f5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -53,28 +55,30 @@ public class ClientScanPlan extends ClientProcessingPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double inputBytes = this.getDelegate().accept(new ByteCountVisitor());
+ Double outputBytes = this.accept(new ByteCountVisitor());
- if (byteCount == null) {
+ if (inputBytes == null || outputBytes == null) {
return Cost.UNKNOWN;
}
- Cost cost = new Cost(0, 0, byteCount);
int parallelLevel = CostUtil.estimateParallelLevel(
false, context.getConnection().getQueryServices());
+ Cost cost = new Cost(0, 0, 0);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ Cost orderByCost =
+ CostUtil.estimateOrderByCost(inputBytes, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return super.getCost().plus(cost);
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 270ad3d..e3e0264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.optimize.Cost;
@@ -202,19 +205,18 @@ public class CorrelatePlan extends DelegateQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ public QueryPlan getRhsPlan() {
+ return rhs;
+ }
+
+ @Override
public Cost getCost() {
- Long lhsByteCount = null;
- try {
- lhsByteCount = delegate.getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
- Long rhsRowCount = null;
- try {
- rhsRowCount = rhs.getEstimatedRowsToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double lhsByteCount = delegate.accept(new ByteCountVisitor());
+ Double rhsRowCount = rhs.accept(new RowCountVisitor());
if (lhsByteCount == null || rhsRowCount == null) {
return Cost.UNKNOWN;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
index cf0a3cf..0ecf74d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.CursorResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -51,6 +52,11 @@ public class CursorFetchPlan extends DelegateQueryPlan {
return resultIterator;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public ExplainPlan getExplainPlan() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 23a0da6..6ade42e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -48,6 +48,9 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
@@ -63,10 +66,7 @@ import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.ParseNode;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.*;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.types.PArrayDataType;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.SQLCloseables;
import com.google.common.collect.Lists;
@@ -92,6 +93,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
private final boolean recompileWhereClause;
private final Set<TableRef> tableRefs;
private final int maxServerCacheTimeToLive;
+ private final long serverCacheLimit;
private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
private HashCacheClient hashClient;
private AtomicLong firstJobEndTime;
@@ -132,8 +134,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
for (SubPlan subPlan : subPlans) {
tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
}
- this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
+ QueryServices services = plan.getContext().getConnection().getQueryServices();
+ this.maxServerCacheTimeToLive = services.getProps().getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+ this.serverCacheLimit = services.getProps().getLong(
+ QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
}
@Override
@@ -270,40 +275,101 @@ public class HashJoinPlan extends DelegateQueryPlan {
return statement;
}
+ public HashJoinInfo getJoinInfo() {
+ return joinInfo;
+ }
+
+ public SubPlan[] getSubPlans() {
+ return subPlans;
+ }
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public Cost getCost() {
- Long byteCount = null;
try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Long r = delegate.getEstimatedRowsToScan();
+ Double w = delegate.accept(new AvgRowWidthVisitor());
+ if (r == null || w == null) {
+ return Cost.UNKNOWN;
+ }
- if (byteCount == null) {
- return Cost.UNKNOWN;
- }
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, getContext().getConnection().getQueryServices());
+
+ double rowWidth = w;
+ double rows = RowCountVisitor.filter(
+ r.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ delegate.getContext().getScan().getFilter()));
+ double bytes = rowWidth * rows;
+ Cost cost = Cost.ZERO;
+ double rhsByteSum = 0.0;
+ for (int i = 0; i < subPlans.length; i++) {
+ double lhsBytes = bytes;
+ Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor());
+ Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor());
+ if (rhsRows == null || rhsWidth == null) {
+ return Cost.UNKNOWN;
+ }
+ double rhsBytes = rhsWidth * rhsRows;
+ rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]);
+ rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]);
+ bytes = rowWidth * rows;
+ cost = cost.plus(CostUtil.estimateHashJoinCost(
+ lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel));
+ rhsByteSum += rhsBytes;
+ }
- Cost cost = new Cost(0, 0, byteCount);
- Cost lhsCost = delegate.getCost();
- if (keyRangeExpressions != null) {
- // The selectivity of the dynamic rowkey filter.
- // TODO replace the constant with an estimate value.
- double selectivity = 0.01;
- lhsCost = lhsCost.multiplyBy(selectivity);
- }
- Cost rhsCost = Cost.ZERO;
- for (SubPlan subPlan : subPlans) {
- rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ if (rhsByteSum > serverCacheLimit) {
+ return Cost.UNKNOWN;
+ }
+
+ // Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan
+ if (delegate instanceof AggregatePlan) {
+ AggregatePlan aggPlan = (AggregatePlan) delegate;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy());
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving());
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
+ Cost aggCost = CostUtil.estimateAggregateCost(
+ bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel);
+ cost = cost.plus(aggCost);
+ rows = rowsAfterHaving;
+ bytes = bytesAfterHaving;
+ }
+ double outputRows = RowCountVisitor.limit(rows, delegate.getLimit());
+ double outputBytes = rowWidth * outputRows;
+ if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) {
+ int parallelLevel2 = CostUtil.estimateParallelLevel(
+ delegate instanceof ScanPlan, getContext().getConnection().getQueryServices());
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytes, outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+
+ // Calculate the cost of child nodes
+ Cost lhsCost = new Cost(0, 0, r.doubleValue() * w);
+ Cost rhsCost = Cost.ZERO;
+ for (SubPlan subPlan : subPlans) {
+ rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ }
+ return cost.plus(lhsCost).plus(rhsCost);
+ } catch (SQLException e) {
}
- return cost.plus(lhsCost).plus(rhsCost);
+ return Cost.UNKNOWN;
}
- protected interface SubPlan {
+ public interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
public QueryPlan getInnerPlan();
+ public boolean hasKeyRangeExpression();
}
public static class WhereClauseSubPlan implements SubPlan {
@@ -383,6 +449,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public QueryPlan getInnerPlan() {
return plan;
}
+
+ @Override
+ public boolean hasKeyRangeExpression() {
+ return false;
+ }
}
public static class HashSubPlan implements SubPlan {
@@ -495,6 +566,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public QueryPlan getInnerPlan() {
return plan;
}
+
+ @Override
+ public boolean hasKeyRangeExpression() {
+ return keyRangeLhsExpression != null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index c9abb69..255fca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -81,6 +82,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches)
throws SQLException {
ResultIterator scanner = new ResultIterator() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d63950c..ed145a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -37,6 +37,8 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.BaseResultIterators;
@@ -202,16 +204,17 @@ public class ScanPlan extends BaseQueryPlan {
} catch (SQLException e) {
// ignored.
}
+ Double outputBytes = this.accept(new ByteCountVisitor());
- if (byteCount == null) {
+ if (byteCount == null || outputBytes == null) {
return Cost.UNKNOWN;
}
- Cost cost = new Cost(0, 0, byteCount);
int parallelLevel = CostUtil.estimateParallelLevel(
true, context.getConnection().getQueryServices());
+ Cost cost = new Cost(0, 0, byteCount);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return cost;
@@ -320,6 +323,11 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() throws SQLException {
if (isSerial) {
return serialRowsEstimate;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 2436d1e..978c7b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -47,6 +47,8 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.MappedByteBufferQueue;
@@ -171,12 +173,7 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double byteCount = this.accept(new ByteCountVisitor());
if (byteCount == null) {
return Cost.UNKNOWN;
@@ -255,7 +252,11 @@ public class SortMergeJoinPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return false;
}
-
+
+ public JoinType getJoinType() {
+ return type;
+ }
+
private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) {
SQLException e = null;
try {
@@ -717,6 +718,11 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return tableRefs;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index f42af56..f869a4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DelegateResultIterator;
import org.apache.phoenix.iterate.FilterResultIterator;
@@ -78,4 +79,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
return iterator;
}
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 3b5168c..6114d66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -112,6 +113,10 @@ public class UnionPlan implements QueryPlan {
return iterators.getScans();
}
+ public List<QueryPlan> getSubPlans() {
+ return plans;
+ }
+
@Override
public GroupBy getGroupBy() {
return groupBy;
@@ -230,7 +235,12 @@ public class UnionPlan implements QueryPlan {
return false;
}
- @Override
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Operation getOperation() {
return statement.getOperation();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 51cb67e..0bc3df4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.BaseSingleExpression;
import org.apache.phoenix.expression.BaseTerminalExpression;
import org.apache.phoenix.expression.Expression;
@@ -64,6 +65,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
return null;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
public class UnnestArrayResultIterator extends DelegateResultIterator {
private final UnnestArrayElemRefExpression elemRefExpression;
private final UnnestArrayElemIndexExpression elemIndexExpression;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
new file mode 100644
index 0000000..9525747
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the average number of bytes each
+ * row for a QueryPlan.
+ */
+public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ try {
+ Long byteCount = plan.getEstimatedBytesToScan();
+ Long rowCount = plan.getEstimatedRowsToScan();
+ if (byteCount != null && rowCount != null) {
+ if (byteCount == 0) {
+ return 0.0;
+ }
+ if (rowCount != 0) {
+ return ((double) byteCount) / rowCount;
+ }
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ try {
+ Long byteCount = plan.getEstimatedBytesToScan();
+ Long rowCount = plan.getEstimatedRowsToScan();
+ if (byteCount != null && rowCount != null) {
+ if (byteCount == 0) {
+ return 0.0;
+ }
+ if (rowCount != 0) {
+ return ((double) byteCount) / rowCount;
+ }
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ Double lhsWidth = plan.getDelegate().accept(this);
+ if (lhsWidth == null) {
+ return null;
+ }
+ JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+ HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+ Double width = lhsWidth;
+ for (int i = 0; i < joinTypes.length; i++) {
+ Double rhsWidth = subPlans[i].getInnerPlan().accept(this);
+ if (rhsWidth == null) {
+ return null;
+ }
+ width = join(width, rhsWidth, joinTypes[i]);
+ }
+
+ return width;
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ Double lhsWidth = plan.getLhsPlan().accept(this);
+ Double rhsWidth = plan.getRhsPlan().accept(this);
+ if (lhsWidth == null || rhsWidth == null) {
+ return null;
+ }
+
+ return join(lhsWidth, rhsWidth, plan.getJoinType());
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ Double sum = 0.0;
+ for (QueryPlan subPlan : plan.getSubPlans()) {
+ Double avgWidth = subPlan.accept(this);
+ if (avgWidth == null) {
+ return null;
+ }
+ sum += avgWidth;
+ }
+
+ return sum / plan.getSubPlans().size();
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+
+ /*
+ * The below methods provide estimation of row width based on the input row width as well as
+ * the operator.
+ */
+
+ public static double join(double lhsWidth, double rhsWidth, JoinTableNode.JoinType type) {
+ double width;
+ switch (type) {
+ case Inner:
+ case Left:
+ case Right:
+ case Full: {
+ width = lhsWidth + rhsWidth;
+ break;
+ }
+ case Semi:
+ case Anti: {
+ width = lhsWidth;
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Invalid join type: " + type);
+ }
+ }
+ return width;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
new file mode 100644
index 0000000..61a2895
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output bytes for a QueryPlan.
+ */
+public class ByteCountVisitor implements QueryPlanVisitor<Double> {
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ protected Double getByteCountFromRowCountAndRowWidth(QueryPlan plan) {
+ Double rowCount = plan.accept(new RowCountVisitor());
+ Double rowWidth = plan.accept(new AvgRowWidthVisitor());
+ if (rowCount == null || rowWidth == null) {
+ return null;
+ }
+
+ return rowCount * rowWidth;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
new file mode 100644
index 0000000..a7ae3af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.*;
+
+/**
+ *
+ * Visitor for a QueryPlan (which may contain other nested query-plans)
+ *
+ */
+public interface QueryPlanVisitor<E> {
+ E defaultReturn(QueryPlan plan);
+ E visit(AggregatePlan plan);
+ E visit(ScanPlan plan);
+ E visit(ClientAggregatePlan plan);
+ E visit(ClientScanPlan plan);
+ E visit(LiteralResultIterationPlan plan);
+ E visit(TupleProjectionPlan plan);
+ E visit(HashJoinPlan plan);
+ E visit(SortMergeJoinPlan plan);
+ E visit(UnionPlan plan);
+ E visit(UnnestArrayPlan plan);
+ E visit(CorrelatePlan plan);
+ E visit(CursorFetchPlan plan);
+ E visit(ListJarsQueryPlan plan);
+ E visit(TraceQueryPlan plan);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
new file mode 100644
index 0000000..58ceea9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.phoenix.compile.GroupByCompiler;
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output rows for a QueryPlan.
+ */
+public class RowCountVisitor implements QueryPlanVisitor<Double> {
+
+ // An estimate of the ratio of result data from group-by against the input data.
+ private final static double GROUPING_FACTOR = 0.1;
+
+ private final static double OUTER_JOIN_FACTOR = 1.15;
+ private final static double INNER_JOIN_FACTOR = 0.85;
+ private final static double SEMI_OR_ANTI_JOIN_FACTOR = 0.5;
+
+ private final static double UNION_DISTINCT_FACTOR = 0.8;
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ try {
+ Long b = plan.getEstimatedRowsToScan();
+ if (b != null) {
+ return limit(
+ filter(
+ aggregate(
+ filter(
+ b.doubleValue(),
+ stripSkipScanFilter(
+ plan.getContext().getScan().getFilter())),
+ plan.getGroupBy()),
+ plan.getHaving()),
+ plan.getLimit());
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ try {
+ Long b = plan.getEstimatedRowsToScan();
+ if (b != null) {
+ return limit(
+ filter(
+ b.doubleValue(),
+ stripSkipScanFilter(plan.getContext().getScan().getFilter())),
+ plan.getLimit());
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ Double b = plan.getDelegate().accept(this);
+ if (b != null) {
+ return limit(
+ filter(
+ aggregate(
+ filter(b.doubleValue(), plan.getWhere()),
+ plan.getGroupBy()),
+ plan.getHaving()),
+ plan.getLimit());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ if (plan.getLimit() != null) {
+ return (double) plan.getLimit();
+ }
+ Double b = plan.getDelegate().accept(this);
+ if (b != null) {
+ return limit(
+ filter(b.doubleValue(), plan.getWhere()),
+ plan.getLimit());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return 1.0;
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ try {
+ QueryPlan lhsPlan = plan.getDelegate();
+ Long b = lhsPlan.getEstimatedRowsToScan();
+ if (b == null) {
+ return null;
+ }
+
+ Double rows = filter(b.doubleValue(),
+ stripSkipScanFilter(lhsPlan.getContext().getScan().getFilter()));
+ JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+ HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+ for (int i = 0; i < joinTypes.length; i++) {
+ Double rhsRows = subPlans[i].getInnerPlan().accept(this);
+ if (rhsRows == null) {
+ return null;
+ }
+ rows = join(rows, rhsRows.doubleValue(), joinTypes[i]);
+ }
+ if (lhsPlan instanceof AggregatePlan) {
+ AggregatePlan aggPlan = (AggregatePlan) lhsPlan;
+ rows = filter(aggregate(rows, aggPlan.getGroupBy()), aggPlan.getHaving());
+ }
+ return limit(rows, lhsPlan.getLimit());
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ Double lhsRows = plan.getLhsPlan().accept(this);
+ Double rhsRows = plan.getRhsPlan().accept(this);
+ if (lhsRows != null && rhsRows != null) {
+ return join(lhsRows, rhsRows, plan.getJoinType());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ int count = plan.getSubPlans().size();
+ double[] inputRows = new double[count];
+ for (int i = 0; i < count; i++) {
+ Double b = plan.getSubPlans().get(i).accept(this);
+ if (b != null) {
+ inputRows[i] = b.doubleValue();
+ } else {
+ return null;
+ }
+ }
+
+ return limit(union(true, inputRows),plan.getLimit());
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ Double lhsRows = plan.getDelegate().accept(this);
+ if (lhsRows != null) {
+ return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return 0.0;
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return 0.0;
+ }
+
+ public static Filter stripSkipScanFilter(Filter filter) {
+ if (filter == null) {
+ return null;
+ }
+ if (!(filter instanceof FilterList)) {
+ return filter instanceof BooleanExpressionFilter ? filter : null;
+ }
+ FilterList filterList = (FilterList) filter;
+ if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) {
+ return filter;
+ }
+ List<Filter> list = new ArrayList<>();
+ for (Filter f : filterList.getFilters()) {
+ Filter stripped = stripSkipScanFilter(f);
+ if (stripped != null) {
+ list.add(stripped);
+ }
+ }
+ return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list));
+ }
+
+
+ /*
+ * The below methods provide estimation of row count based on the input row count as well as
+ * the operator. They should be replaced by more accurate calculation based on histogram and
+ * a logical operator layer is expect to facilitate this.
+ */
+
+ public static double filter(double inputRows, Filter filter) {
+ if (filter == null) {
+ return inputRows;
+ }
+ return 0.5 * inputRows;
+ }
+
+ public static double filter(double inputRows, Expression filter) {
+ if (filter == null) {
+ return inputRows;
+ }
+ return 0.5 * inputRows;
+ }
+
+ public static double aggregate(double inputRows, GroupByCompiler.GroupBy groupBy) {
+ if (groupBy.isUngroupedAggregate()) {
+ return 1.0;
+ }
+ return GROUPING_FACTOR * inputRows;
+ }
+
+ public static double limit(double inputRows, Integer limit) {
+ if (limit == null) {
+ return inputRows;
+ }
+ return limit;
+ }
+
+ public static double join(double lhsRows, double[] rhsRows, JoinTableNode.JoinType[] types) {
+ assert rhsRows.length == types.length;
+ double rows = lhsRows;
+ for (int i = 0; i < rhsRows.length; i++) {
+ rows = join(rows, rhsRows[i], types[i]);
+ }
+ return rows;
+ }
+
+ public static double join(double lhsRows, double rhsRows, JoinTableNode.JoinType type) {
+ double rows;
+ switch (type) {
+ case Inner: {
+ rows = Math.min(lhsRows, rhsRows);
+ rows = rows * INNER_JOIN_FACTOR;
+ break;
+ }
+ case Left:
+ case Right:
+ case Full: {
+ rows = Math.max(lhsRows, rhsRows);
+ rows = rows * OUTER_JOIN_FACTOR;
+ break;
+ }
+ case Semi:
+ case Anti: {
+ rows = lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Invalid join type: " + type);
+ }
+ }
+ return rows;
+ }
+
+ public static double union(boolean all, double... inputRows) {
+ double rows = 0.0;
+ for (double d : inputRows) {
+ rows += d;
+ }
+ if (!all) {
+ rows *= UNION_DISTINCT_FACTOR;
+ }
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index da8beae..a55af6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
@@ -731,6 +732,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.defaultReturn(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return estimatedRows;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
index 1d4b8e0..db2b5ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -30,51 +30,52 @@ import org.apache.phoenix.query.QueryServices;
*/
public class CostUtil {
- // An estimate of the ratio of result data from group-by against the input data.
- private final static double GROUPING_FACTOR = 0.1;
-
- // Io operations conducted in intermediate evaluations like sorting or aggregation
- // should be counted twice since they usually involve both read and write.
- private final static double IO_COST_MULTIPLIER = 2.0;
-
/**
- * Estimate the number of output bytes of an aggregate.
- * @param byteCount the number of input bytes
+ * Estimate the cost of an aggregate.
+ * @param inputBytes the number of input bytes
+ * @param outputBytes the number of output bytes
* @param groupBy the compiled GroupBy object
- * @param aggregatorsSize the byte size of aggregators
- * @return the output byte count
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
*/
- public static double estimateAggregateOutputBytes(
- double byteCount, GroupBy groupBy, int aggregatorsSize) {
- if (groupBy.isUngroupedAggregate()) {
- return aggregatorsSize;
- }
- return byteCount * GROUPING_FACTOR;
+ public static Cost estimateAggregateCost(
+ double inputBytes, double outputBytes, GroupBy groupBy, int parallelLevel) {
+ double hashMapOverhead = groupBy.isOrderPreserving() || groupBy.isUngroupedAggregate() ? 1 : (outputBytes < 1 ? 1 : outputBytes);
+ return new Cost(0, 0, (outputBytes + hashMapOverhead * Math.log(inputBytes)) / parallelLevel);
}
/**
- * Estimate the cost of an aggregate.
- * @param byteCount the number of input bytes
- * @param groupBy the compiled GroupBy object
- * @param aggregatorsSize the byte size of aggregators
+ * Estimate the cost of an order-by
+ * @param inputBytes the number of input bytes
+ * @param outputBytes the number of output bytes, which may be different from inputBytes
+ * depending on whether there is a LIMIT
* @param parallelLevel number of parallel workers or threads
* @return the cost
*/
- public static Cost estimateAggregateCost(
- double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
- double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
- double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
- return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+ public static Cost estimateOrderByCost(double inputBytes, double outputBytes, int parallelLevel) {
+ if (inputBytes < 1) {
+ inputBytes = 1;
+ }
+ return new Cost(0, 0,
+ (outputBytes + outputBytes * Math.log(inputBytes)) / parallelLevel);
}
/**
- * Estimate the cost of an order-by
- * @param byteCount the number of input bytes
+ * Estimate the cost of a hash-join
+ * @param lhsBytes the number of left input bytes
+ * @param rhsBytes the number of right input bytes
+ * @param outputBytes the number of output bytes
* @param parallelLevel number of parallel workers or threads
* @return the cost
*/
- public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
- return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+ public static Cost estimateHashJoinCost(
+ double lhsBytes, double rhsBytes, double outputBytes,
+ boolean hasKeyRangeExpression, int parallelLevel) {
+ if (rhsBytes < 1) {
+ rhsBytes = 1;
+ }
+ return new Cost(0, 0,
+ (rhsBytes * Math.log(rhsBytes) + (hasKeyRangeExpression ? 0 : lhsBytes)) / parallelLevel + outputBytes);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1903dda..69aeaad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.iterate.ParallelIterators;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -474,6 +475,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.defaultReturn(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return null;
}