You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/25 21:39:15 UTC
[12/12] phoenix git commit: Temporary fix for PHOENIX-2714
Temporary fix for PHOENIX-2714
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36ee23ab
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36ee23ab
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36ee23ab
Branch: refs/heads/calcite
Commit: 36ee23ab7760616394f32f88b2da6b038cfee1e9
Parents: 06fd581
Author: maryannxue <ma...@gmail.com>
Authored: Thu Feb 25 15:38:55 2016 -0500
Committer: maryannxue <ma...@gmail.com>
Committed: Thu Feb 25 15:38:55 2016 -0500
----------------------------------------------------------------------
.../phoenix/calcite/rel/PhoenixTableScan.java | 4 +-
.../apache/phoenix/execute/AggregatePlan.java | 2 -
.../apache/phoenix/execute/BaseQueryPlan.java | 11 --
.../org/apache/phoenix/execute/ScanPlan.java | 2 -
.../phoenix/iterate/BaseResultIterators.java | 154 ++++++++++++++++---
5 files changed, 137 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 80c328f..dfdb507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.iterate.BaseResultIterators;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.SelectStatement;
@@ -131,8 +132,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr);
WhereCompiler.setScanFilter(context, select, filterExpr, true, false);
scanRanges = context.getScanRanges();
- ScanPlan plan = new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, true, null);
- estimatedSize = plan.getEstimatedBytes();
+ estimatedSize = BaseResultIterators.getEstimatedCount(context, tableRef.getTable()).getSecond();
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index f9eca0c..794247b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -208,8 +208,6 @@ public class AggregatePlan extends BaseQueryPlan {
splits = iterators.getSplits();
scans = iterators.getScans();
- estimatedSize = iterators.getEstimatedByteCount();
- estimatedRows = iterators.getEstimatedRowCount();
AggregatingResultIterator aggResultIterator;
// No need to merge sort for ungrouped aggregation
http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 0ee70ba..f389fd0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -108,8 +108,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
* immediately before creating the ResultIterator.
*/
protected final Expression dynamicFilter;
- protected Long estimatedRows;
- protected Long estimatedSize;
protected BaseQueryPlan(
@@ -129,16 +127,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
this.parallelIteratorFactory = parallelIteratorFactory;
this.dynamicFilter = dynamicFilter;
}
-
- public Long getEstimatedRowCount() {
- return this.estimatedRows;
- }
- public Long getEstimatedByteCount() {
- return this.estimatedSize;
- }
-
-
@Override
public Operation getOperation() {
return Operation.QUERY;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 72d95be..23bf435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -208,8 +208,6 @@ public class ScanPlan extends BaseQueryPlan {
}
splits = iterators.getSplits();
scans = iterators.getScans();
- estimatedSize = iterators.getEstimatedByteCount();
- estimatedRows = iterators.getEstimatedRowCount();
if (isOrdered) {
scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index a48de13..22c0197 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -113,7 +113,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final List<List<Scan>> scans;
private final List<KeyRange> splits;
- private final PTableStats tableStats;
private final byte[] physicalTableName;
private final QueryPlan plan;
protected final String scanId;
@@ -136,7 +135,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return plan.getTableRef().getTable();
}
- private boolean useStats() {
+ private static boolean useStats(StatementContext context) {
Scan scan = context.getScan();
boolean isPointLookup = context.getScanRanges().isPointLookup();
/*
@@ -337,7 +336,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
physicalTableName = table.getPhysicalName().getBytes();
- tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
// Used to tie all the scans together during logging
scanId = UUID.randomUUID().toString();
@@ -397,18 +395,18 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return guideIndex;
}
- private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) {
+ private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions) throws SQLException {
/*
* Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to
* be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan
* entire regions worth of data to track where to put the guide posts.
*/
- if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; }
+ if (!useStats(context)) { return GuidePostsInfo.NO_GUIDEPOST; }
GuidePostsInfo gps = null;
- PTable table = getTable();
+ PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table);
Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
- byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+ byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
if (table.getColumnFamilies().isEmpty()) {
// For sure we can get the defaultCF from the table
gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
@@ -430,7 +428,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return gps;
}
- private GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) {
+ private static GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) {
if (guidePostMap.get(defaultCF) != null) {
return guidePostMap.get(defaultCF);
}
@@ -491,7 +489,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
whereConditions.add(cf);
}
}
- GuidePostsInfo gps = getGuidePosts(whereConditions);
+ GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
boolean traverseAllRegions = isSalted || isLocalIndex;
if (!traverseAllRegions) {
@@ -566,8 +564,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
false);
- estimatedRows += gps.getRowCounts().get(guideIndex);
- estimatedSize += gps.getByteCounts().get(guideIndex);
+ if (newScan != null) {
+ estimatedRows += gps.getRowCounts().get(guideIndex);
+ estimatedSize += gps.getByteCounts().get(guideIndex);
+ }
scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
currentKeyBytes = currentGuidePost.copyBytes();
currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -603,6 +603,130 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return parallelScans;
}
+
+ /**
+ * Compute the estimated count of rows and bytes that will be scanned.
+ * @return the estimated row count and the byte count.
+ * @throws SQLException
+ */
+ public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException {
+ if (table.getName() == null) { // empty table
+ return new Pair<Long, Long>(null, null);
+ }
+
+ if (context.getScanRanges().isPointLookup()) {
+ return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table));
+ }
+
+ TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+ byte[] cf = where.getFirst();
+ if (cf != null) {
+ whereConditions.add(cf);
+ }
+ }
+ GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
+ if (gps == GuidePostsInfo.NO_GUIDEPOST) {
+ return new Pair<Long, Long>(null, null);
+ }
+
+ byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ Scan scan = context.getScan();
+ List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+ .getAllTableRegions(table.getPhysicalName().getBytes());
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ ScanRanges scanRanges = context.getScanRanges();
+ boolean isSalted = table.getBucketNum() != null;
+ boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+ boolean traverseAllRegions = isSalted || isLocalIndex;
+ if (!traverseAllRegions) {
+ byte[] scanStartRow = scan.getStartRow();
+ if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+ startKey = scanStartRow;
+ }
+ byte[] scanStopRow = scan.getStopRow();
+ if (stopKey.length == 0
+ || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
+ stopKey = scanStopRow;
+ }
+ }
+
+ int regionIndex = 0;
+ int stopIndex = regionBoundaries.size();
+ if (startKey.length > 0) {
+ regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+ }
+ if (stopKey.length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+ if (isLocalIndex) {
+ stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+ }
+ }
+
+ ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
+
+ int gpsSize = gps.getGuidePostsCount();
+ int keyOffset = 0;
+ ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
+ ImmutableBytesWritable guidePosts = gps.getGuidePosts();
+ ByteArrayInputStream stream = null;
+ DataInput input = null;
+ PrefixByteDecoder decoder = null;
+ int guideIndex = 0;
+ long estimatedRows = 0;
+ long estimatedSize = 0;
+ try {
+ if (gpsSize > 0) {
+ stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
+ input = new DataInputStream(stream);
+ decoder = new PrefixByteDecoder(gps.getMaxLength());
+ try {
+ while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
+ && currentKey.getLength() != 0) {
+ guideIndex++;
+ }
+ } catch (EOFException e) {}
+ }
+ byte[] currentKeyBytes = currentKey.copyBytes();
+
+ // Merge bisect with guideposts for all but the last region
+ while (regionIndex <= stopIndex) {
+ byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
+ byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+ if (regionIndex == stopIndex) {
+ endKey = stopKey;
+ } else {
+ endKey = regionBoundaries.get(regionIndex);
+ }
+ HRegionLocation regionLocation = regionLocations.get(regionIndex);
+ if (isLocalIndex) {
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
+ endRegionKey = regionInfo.getEndKey();
+ keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+ }
+ try {
+ while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
+ Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
+ false);
+ if (newScan != null) {
+ estimatedRows += gps.getRowCounts().get(guideIndex);
+ estimatedSize += gps.getByteCounts().get(guideIndex);
+ }
+ currentKeyBytes = currentGuidePost.copyBytes();
+ currentGuidePost = PrefixByteCodec.decode(decoder, input);
+ currentGuidePostBytes = currentGuidePost.copyBytes();
+ guideIndex++;
+ }
+ } catch (EOFException e) {}
+ currentKeyBytes = endKey;
+ regionIndex++;
+ }
+ } finally {
+ if (stream != null) Closeables.closeQuietly(stream);
+ }
+ return new Pair<Long, Long>(estimatedRows, estimatedSize);
+ }
public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
if (!reverse) {
@@ -884,14 +1008,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
buf.append(getName()).append(" ").append(size()).append("-WAY ");
explain(buf.toString(),planSteps);
}
-
- public Long getEstimatedRowCount() {
- return this.estimatedRows;
- }
-
- public Long getEstimatedByteCount() {
- return this.estimatedSize;
- }
@Override
public String toString() {