You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/09/27 02:22:30 UTC
[1/2] phoenix git commit: PHOENIX-4007 Surface time at which byte/row
estimate information was computed in explain plan output
Repository: phoenix
Updated Branches:
refs/heads/master 176f541ce -> 6d8357e90
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index a3a9762..f037a20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
@@ -149,6 +148,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
private Long estimatedRows;
private Long estimatedSize;
+ private Long estimateInfoTimestamp;
private boolean hasGuidePosts;
private Scan scan;
private boolean useStatsForParallelization;
@@ -679,7 +679,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
boolean isSalted = table.getBucketNum() != null;
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
GuidePostsInfo gps = getGuidePosts();
+ // case when stats wasn't collected
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
+ // Case when stats collection did run but there possibly wasn't enough data. In such a
+ // case we generate an empty guide post with the byte estimate being set as guide post
+ // width.
+ boolean emptyGuidePost = gps.isEmptyGuidePost();
boolean traverseAllRegions = isSalted || isLocalIndex;
if (!traverseAllRegions) {
byte[] scanStartRow = scan.getStartRow();
@@ -720,6 +725,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
int guideIndex = 0;
long estimatedRows = 0;
long estimatedSize = 0;
+ long estimateTs = Long.MAX_VALUE;
+ long minGuidePostTimestamp = Long.MAX_VALUE;
try {
if (gpsSize > 0) {
stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
@@ -728,11 +735,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
try {
while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
&& currentKey.getLength() != 0) {
+ minGuidePostTimestamp = Math.min(estimateTs,
+ gps.getGuidePostTimestamps()[guideIndex]);
guideIndex++;
}
- } catch (EOFException e) {}
+ } catch (EOFException e) {
+ // expected. Thrown when we have decoded all guide posts.
+ }
}
byte[] currentKeyBytes = currentKey.copyBytes();
+ boolean intersectWithGuidePosts = guideIndex < gpsSize;
+ if (!intersectWithGuidePosts) {
+ // If there are no guide posts within the query range, we use the estimateInfoTimestamp
+ // as the minimum time across all guideposts
+ estimateTs = minGuidePostTimestamp;
+ }
// Merge bisect with guideposts for all but the last region
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
@@ -748,25 +765,37 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
endRegionKey = regionInfo.getEndKey();
keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
}
- try {
- while (guideIndex < gpsSize && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
- Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
- false);
- if (newScan != null) {
- ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
- regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
- estimatedRows += gps.getRowCounts()[guideIndex];
- estimatedSize += gps.getByteCounts()[guideIndex];
- }
- if (useStatsForParallelization) {
- scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
- }
- currentKeyBytes = currentGuidePostBytes;
+ while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
+ Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
+ false);
+ if (newScan != null) {
+ ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
+ regionInfo.getStartKey(), regionInfo.getEndKey(),
+ newScan.getStartRow(), newScan.getStopRow());
+ estimatedRows += gps.getRowCounts()[guideIndex];
+ estimatedSize += gps.getByteCounts()[guideIndex];
+ }
+ if (useStatsForParallelization) {
+ scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
+ }
+ currentKeyBytes = currentGuidePostBytes;
+ try {
currentGuidePost = PrefixByteCodec.decode(decoder, input);
currentGuidePostBytes = currentGuidePost.copyBytes();
+ /*
+ * It is possible that the timestamp of guideposts could be different.
+ * So we report the time at which stats information was collected as the
+ * minimum of timestamp of the guideposts that we will be going over.
+ */
+ estimateTs =
+ Math.min(estimateTs,
+ gps.getGuidePostTimestamps()[guideIndex]);
guideIndex++;
+ } catch (EOFException e) {
+ // We have read all guide posts
+ intersectWithGuidePosts = false;
}
- } catch (EOFException e) {}
+ }
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
if(newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
@@ -779,12 +808,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (scanRanges.isPointLookup()) {
this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount());
this.estimatedSize = this.estimatedRows * SchemaUtil.estimateRowSize(table);
+ this.estimateInfoTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ } else if (emptyGuidePost) {
+ // In case of an empty guide post, we estimate the number of rows scanned by
+ // using the estimated row size
+ this.estimatedRows = (gps.getByteCounts()[0] / SchemaUtil.estimateRowSize(table));
+ this.estimatedSize = gps.getByteCounts()[0];
+ this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
} else if (hasGuidePosts) {
this.estimatedRows = estimatedRows;
this.estimatedSize = estimatedSize;
+ this.estimateInfoTimestamp = estimateTs;
} else {
this.estimatedRows = null;
this.estimatedSize = null;
+ this.estimateInfoTimestamp = null;
}
if (!scans.isEmpty()) { // Add any remaining scans
parallelScans.add(scans);
@@ -1171,4 +1209,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]";
}
+ public Long getEstimateInfoTimestamp() {
+ return this.estimateInfoTimestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9f1360a..d35cce1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
@@ -508,25 +509,33 @@ public class PhoenixStatement implements Statement, SQLCloseable {
private static final String EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN_NAME = "BytesEstimate";
private static final byte[] EXPLAIN_PLAN_BYTES_ESTIMATE =
PVarchar.INSTANCE.toBytes(EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN_NAME);
- private static final String EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN_NAME = "RowsEstimate";
- private static final byte[] EXPLAIN_PLAN_ROWS_ESTIMATE =
- PVarchar.INSTANCE.toBytes(EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN_NAME);
-
public static final String EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN_ALIAS = "EST_BYTES_READ";
- public static final String EXPLAIN_PLAN_ROWS_COLUMN_ALIAS = "EST_ROWS_READ";
-
private static final PColumnImpl EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN =
new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_BYTES_ESTIMATE),
PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, false, 1,
SortOrder.getDefault(), 0, null, false, null, false, false,
EXPLAIN_PLAN_BYTES_ESTIMATE);
+ private static final String EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN_NAME = "RowsEstimate";
+ private static final byte[] EXPLAIN_PLAN_ROWS_ESTIMATE =
+ PVarchar.INSTANCE.toBytes(EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN_NAME);
+ public static final String EXPLAIN_PLAN_ROWS_COLUMN_ALIAS = "EST_ROWS_READ";
private static final PColumnImpl EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN =
new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_ROWS_ESTIMATE),
PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, false, 2,
SortOrder.getDefault(), 0, null, false, null, false, false,
EXPLAIN_PLAN_ROWS_ESTIMATE);
+ private static final String EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_NAME = "EstimateInfoTS";
+ private static final byte[] EXPLAIN_PLAN_ESTIMATE_INFO_TS =
+ PVarchar.INSTANCE.toBytes(EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_NAME);
+ public static final String EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_ALIAS = "EST_INFO_TS";
+ private static final PColumnImpl EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN =
+ new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_ESTIMATE_INFO_TS),
+ PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, false, 3,
+ SortOrder.getDefault(), 0, null, false, null, false, false,
+ EXPLAIN_PLAN_ESTIMATE_INFO_TS);
+
private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR_WITH_BYTE_ROW_ESTIMATES =
new RowProjector(Arrays
.<ColumnProjector> asList(
@@ -535,12 +544,17 @@ public class PhoenixStatement implements Statement, SQLCloseable {
new RowKeyValueAccessor(Collections
.<PDatum> singletonList(EXPLAIN_PLAN_DATUM), 0)),
false),
- new ExpressionProjector(
- EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN_ALIAS, EXPLAIN_PLAN_TABLE_NAME,
- new KeyValueColumnExpression(EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN), false),
- new ExpressionProjector(EXPLAIN_PLAN_ROWS_COLUMN_ALIAS,
+ new ExpressionProjector(EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN_ALIAS,
EXPLAIN_PLAN_TABLE_NAME, new KeyValueColumnExpression(
- EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN),
+ EXPLAIN_PLAN_BYTES_ESTIMATE_COLUMN),
+ false),
+ new ExpressionProjector(EXPLAIN_PLAN_ROWS_COLUMN_ALIAS,
+ EXPLAIN_PLAN_TABLE_NAME,
+ new KeyValueColumnExpression(EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN),
+ false),
+ new ExpressionProjector(EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_ALIAS,
+ EXPLAIN_PLAN_TABLE_NAME,
+ new KeyValueColumnExpression(EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN),
false)),
0, true);
@@ -569,6 +583,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
Long estimatedBytesToScan = plan.getEstimatedBytesToScan();
Long estimatedRowsToScan = plan.getEstimatedRowsToScan();
+ Long estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
for (String planStep : planSteps) {
byte[] row = PVarchar.INSTANCE.toBytes(planStep);
List<Cell> cells = Lists.newArrayListWithCapacity(3);
@@ -584,11 +599,18 @@ public class PhoenixStatement implements Statement, SQLCloseable {
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
PLong.INSTANCE.toBytes(estimatedRowsToScan)));
}
+ if (estimateInfoTimestamp != null) {
+ cells.add(KeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_ESTIMATE_INFO_TS,
+ MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+ PLong.INSTANCE.toBytes(estimateInfoTimestamp)));
+ }
+ Collections.sort(cells, KeyValue.COMPARATOR);
Tuple tuple = new MultiKeyValueTuple(cells);
tuples.add(tuple);
}
final Long estimatedBytes = estimatedBytesToScan;
final Long estimatedRows = estimatedRowsToScan;
+ final Long estimateTs = estimateInfoTimestamp;
final ResultIterator iterator = new MaterializedResultIterator(tuples);
return new QueryPlan() {
@@ -706,6 +728,10 @@ public class PhoenixStatement implements Statement, SQLCloseable {
return estimatedBytes;
}
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return estimateTs;
+ }
};
}
}
@@ -1315,6 +1341,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public Long getEstimatedBytesToScan() throws SQLException {
return 0l;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index b8ba759..6e56229 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -48,9 +48,9 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TimeKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,18 +62,19 @@ import com.google.common.collect.Maps;
class DefaultStatisticsCollector implements StatisticsCollector {
private static final Logger logger = LoggerFactory.getLogger(DefaultStatisticsCollector.class);
private final Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap();
- private final StatisticsWriter statsWriter;
+ private StatisticsWriter statsWriter;
private final Pair<Long, GuidePostsInfoBuilder> cachedGuidePosts;
private final byte[] guidePostWidthBytes;
private final byte[] guidePostPerRegionBytes;
// Where to look for GUIDE_POSTS_WIDTH in SYSTEM.CATALOG
private final byte[] ptableKey;
private final RegionCoprocessorEnvironment env;
-
private long guidePostDepth;
private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
private static final Log LOG = LogFactory.getLog(DefaultStatisticsCollector.class);
private ImmutableBytesWritable currentRow;
+ private final long clientTimeStamp;
+ private final String tableName;
DefaultStatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family,
byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException {
@@ -96,9 +97,8 @@ class DefaultStatisticsCollector implements StatisticsCollector {
pName = MetaDataUtil.getViewIndexUserTableName(tableName);
}
ptableKey = SchemaUtil.getTableKeyFromFullName(pName);
- // Get the stats table associated with the current table on which the CP is
- // triggered
- this.statsWriter = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
+ this.clientTimeStamp = clientTimeStamp;
+ this.tableName = tableName;
// in a compaction we know the one family ahead of time
if (family != null) {
ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
@@ -121,14 +121,14 @@ class DefaultStatisticsCollector implements StatisticsCollector {
guidepostWidth = PLong.INSTANCE.getCodec().decodeInt(guidePostWidthBytes, 0, SortOrder.getDefault());
}
this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
- env.getRegion().getTableDesc());
+ env.getRegion().getTableDesc());
} else {
long guidepostWidth = -1;
HTableInterface htable = null;
try {
// Next check for GUIDE_POST_WIDTH on table
htable = env.getTable(
- SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
+ SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
Get get = new Get(ptableKey);
get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
Result result = htable.get(get);
@@ -151,13 +151,13 @@ class DefaultStatisticsCollector implements StatisticsCollector {
// Last use global config value
Configuration config = env.getConfiguration();
this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
- config.getInt(
- QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
- config.getLong(
- QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
- env.getRegion().getTableDesc());
+ config.getInt(
+ QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+ config.getLong(
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+ env.getRegion().getTableDesc());
}
}
}
@@ -176,7 +176,7 @@ class DefaultStatisticsCollector implements StatisticsCollector {
public void updateStatistic(Region region, Scan scan) {
try {
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- writeStatistics(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime(), scan);
+ writeStatistics(region, true, mutations, EnvironmentEdgeManager.currentTimeMillis(), scan);
if (logger.isDebugEnabled()) {
logger.debug("Committing new stats for the region " + region.getRegionInfo());
}
@@ -217,7 +217,7 @@ class DefaultStatisticsCollector implements StatisticsCollector {
if (logger.isDebugEnabled()) {
logger.debug("Deleting the stats for the region " + region.getRegionInfo());
}
- statsWriter.deleteStats(region, this, fam, mutations);
+ statsWriter.deleteStatsForRegion(region, this, fam, mutations);
}
if (logger.isDebugEnabled()) {
logger.debug("Adding new stats for the region " + region.getRegionInfo());
@@ -291,7 +291,7 @@ class DefaultStatisticsCollector implements StatisticsCollector {
long byteCount = gps.getFirst() + kvLength;
gps.setFirst(byteCount);
if (byteCount >= guidePostDepth) {
- if (gps.getSecond().addGuidePosts(row, byteCount, gps.getSecond().getRowCount())) {
+ if (gps.getSecond().addGuidePostOnCollection(row, byteCount, gps.getSecond().getRowCount())) {
gps.setFirst(0l);
gps.getSecond().resetRowCount();
}
@@ -307,21 +307,17 @@ class DefaultStatisticsCollector implements StatisticsCollector {
logger.debug("Compaction scanner created for stats");
}
ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
- return getInternalScanner(env, s, cfKey);
- }
-
- private InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, InternalScanner internalScan,
- ImmutableBytesPtr family) throws IOException {
- StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family);
- // We need to initialize the scanner synchronously and potentially perform a cross region Get
- // in order to use the correct guide posts width for the table being compacted.
+ // Potentially perform a cross region server get in order to use the correct guide posts
+ // width for the table being compacted.
init();
+ StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, s, cfKey);
return scanner;
}
@Override
public void init() throws IOException {
initGuidepostDepth();
+ this.statsWriter = StatisticsWriter.newWriter(env, tableName, clientTimeStamp, guidePostDepth);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
index 43fe7f3..04c69bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
@@ -17,10 +17,11 @@
*/
package org.apache.phoenix.schema.stats;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.SizedUtil;
@@ -30,21 +31,18 @@ import com.google.common.primitives.Longs;
* guidePosts of different regions when the GuidePostsInfo is formed for a table.
*/
public class GuidePostsInfo {
- public final static GuidePostsInfo EMPTY_GUIDEPOST = new GuidePostsInfo(new ArrayList<Long>(),
- new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), new ArrayList<Long>(), 0, 0) {
- @Override
- public int getEstimatedSize() {
- return 0;
- }
- };
- public final static GuidePostsInfo NO_GUIDEPOST = new GuidePostsInfo(new ArrayList<Long>(),
- new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), new ArrayList<Long>(), 0, 0) {
- @Override
- public int getEstimatedSize() {
- return 0;
- }
- };
-
+ public final static GuidePostsInfo NO_GUIDEPOST =
+ new GuidePostsInfo(Collections.<Long> emptyList(),
+ new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY),
+ Collections.<Long> emptyList(), 0, 0, Collections.<Long> emptyList()) {
+ @Override
+ public int getEstimatedSize() {
+ return 0;
+ }
+ };
+
+ public final static byte[] EMPTY_GUIDEPOST_KEY = ByteUtil.EMPTY_BYTE_ARRAY;
+
/**
* the total number of guidePosts for the table combining all the guidePosts per region per cf.
*/
@@ -69,6 +67,10 @@ public class GuidePostsInfo {
* Estimate of byte size of this instance
*/
private final int estimatedSize;
+ /**
+ * The timestamps at which guideposts were created/updated
+ */
+ private final long[] gpTimestamps;
/**
* Constructor that creates GuidePostsInfo per region
@@ -83,9 +85,11 @@ public class GuidePostsInfo {
* Maximum length of a guidePost collected
* @param guidePostsCount
* Number of guidePosts
+ * @param gpTimestamps
+ * Times at which guidePosts were updated/created
*/
public GuidePostsInfo(List<Long> byteCounts, ImmutableBytesWritable guidePosts, List<Long> rowCounts, int maxLength,
- int guidePostsCount) {
+ int guidePostsCount, List<Long> updateTimes) {
this.guidePosts = new ImmutableBytesWritable(guidePosts);
this.maxLength = maxLength;
this.guidePostsCount = guidePostsCount;
@@ -99,6 +103,7 @@ public class GuidePostsInfo {
+ SizedUtil.ARRAY_SIZE + this.byteCounts.length * SizedUtil.LONG_SIZE // byteCounts
+ SizedUtil.INT_SIZE; // estimatedSize
this.estimatedSize = estimatedSize;
+ this.gpTimestamps = Longs.toArray(updateTimes);
}
public ImmutableBytesWritable getGuidePosts() {
@@ -121,7 +126,27 @@ public class GuidePostsInfo {
return byteCounts;
}
+ public long[] getGuidePostTimestamps() {
+ return gpTimestamps;
+ }
+
public int getEstimatedSize() {
return estimatedSize;
}
+
+ public boolean isEmptyGuidePost() {
+ return guidePosts.equals(EMPTY_GUIDEPOST_KEY) && guidePostsCount == 0
+ && byteCounts.length == 1 && gpTimestamps.length == 1;
+ }
+
+ public static GuidePostsInfo createEmptyGuidePost(long byteCount, long guidePostUpdateTime) {
+ return new GuidePostsInfo(Collections.singletonList(byteCount),
+ new ImmutableBytesWritable(EMPTY_GUIDEPOST_KEY), Collections.<Long> emptyList(), 0,
+ 0, Collections.<Long> singletonList(guidePostUpdateTime));
+ }
+
+ public static boolean isEmptyGpsKey(byte[] key) {
+ return Bytes.equals(key, GuidePostsInfo.EMPTY_GUIDEPOST_KEY);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
index 246ef6c..2b82936 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
@@ -50,6 +50,7 @@ public class GuidePostsInfoBuilder {
private TrustedByteArrayOutputStream stream;
private List<Long> rowCounts = new ArrayList<Long>();
private List<Long> byteCounts = new ArrayList<Long>();
+ private List<Long> guidePostsTimestamps = new ArrayList<Long>();
public boolean isEmpty() {
return rowCounts.size() == 0;
@@ -63,6 +64,10 @@ public class GuidePostsInfoBuilder {
return byteCounts;
}
+ public List<Long> getGuidePostsTimestamps() {
+ return guidePostsTimestamps;
+ }
+
public int getMaxLength() {
return maxLength;
}
@@ -73,20 +78,31 @@ public class GuidePostsInfoBuilder {
lastRow = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
}
+ public boolean addGuidePostOnCollection(ImmutableBytesWritable row, long byteCount,
+ long rowCount) {
+ /*
+ * When collecting guideposts, we don't care about the time at which guide post is being
+ * created/updated at. So passing it as 0 here. The update/create timestamp is important
+ * when we are reading guideposts out of the SYSTEM.STATS table.
+ */
+ return trackGuidePost(row, byteCount, rowCount, 0);
+ }
+
/**
- * The guide posts, rowCount and byteCount are accumulated every time a guidePosts depth is
- * reached while collecting stats.
- * @param row
- * @param byteCount
- * @return
- * @throws IOException
+ * Track a new guide post
+ * @param row number of rows in the guidepost
+ * @param byteCount number of bytes in the guidepost
+ * @param updateTimestamp time at which guidepost was created/updated.
+ * @throws IOException
*/
- public boolean addGuidePosts(ImmutableBytesWritable row, long byteCount, long rowCount) {
+ public boolean trackGuidePost(ImmutableBytesWritable row, long byteCount, long rowCount,
+ long updateTimestamp) {
if (row.getLength() != 0 && lastRow.compareTo(row) < 0) {
try {
encoder.encode(output, row.get(), row.getOffset(), row.getLength());
rowCounts.add(rowCount);
byteCounts.add(byteCount);
+ guidePostsTimestamps.add(updateTimestamp);
this.guidePostsCount++;
this.maxLength = encoder.getMaxLength();
lastRow = row;
@@ -98,22 +114,10 @@ public class GuidePostsInfoBuilder {
return false;
}
- public boolean addGuidePosts(byte[] row) {
- return addGuidePosts(new ImmutableBytesWritable(row), 0, 0);
- }
-
- public boolean addGuidePosts(byte[] row, long byteCount) {
- return addGuidePosts(new ImmutableBytesWritable(row), byteCount, 0);
- }
-
- public boolean addGuidePosts(byte[] row, long byteCount, long rowCount) {
- return addGuidePosts(new ImmutableBytesWritable(row), byteCount, rowCount);
- }
-
public GuidePostsInfo build() {
this.guidePosts.set(stream.getBuffer(), 0, stream.size());
GuidePostsInfo guidePostsInfo = new GuidePostsInfo(this.byteCounts, this.guidePosts, this.rowCounts,
- this.maxLength, this.guidePostsCount);
+ this.maxLength, this.guidePostsCount, this.guidePostsTimestamps);
return guidePostsInfo;
}
@@ -128,5 +132,9 @@ public class GuidePostsInfoBuilder {
public long getRowCount() {
return rowCount;
}
+
+ public boolean hasGuidePosts() {
+ return guidePostsCount > 0;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 8306940..ab6b3db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -155,7 +155,7 @@ public class StatisticsScanner implements InternalScanner {
LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
- getStatisticsWriter().deleteStats(region, tracker, family, mutations);
+ getStatisticsWriter().deleteStatsForRegion(region, tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 07b412f..0b9c409 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -19,11 +19,7 @@ package org.apache.phoenix.schema.stats;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -37,7 +33,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
@@ -122,7 +117,7 @@ public class StatisticsUtil {
return rowKey;
}
- private static byte[] getAdjustedKey(byte[] key, byte[] tableNameBytes, ImmutableBytesWritable cf, boolean nextKey) {
+ public static byte[] getAdjustedKey(byte[] key, byte[] tableNameBytes, ImmutableBytesWritable cf, boolean nextKey) {
if (Bytes.compareTo(key, ByteUtil.EMPTY_BYTE_ARRAY) != 0) {
return getRowKey(tableNameBytes, cf, key);
}
@@ -133,30 +128,6 @@ public class StatisticsUtil {
return key;
}
- public static List<Result> readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf,
- byte[] startKey, byte[] stopKey, long clientTimeStamp) throws IOException {
- List<Result> statsForRegion = new ArrayList<Result>();
- Scan s = MetaDataUtil.newTableRowsScan(
- getAdjustedKey(startKey, tableNameBytes, cf, false),
- getAdjustedKey(stopKey, tableNameBytes, cf, true),
- MetaDataProtocol.MIN_TABLE_TIMESTAMP,
- clientTimeStamp);
- s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
- ResultScanner scanner = null;
- try {
- scanner = statsHTable.getScanner(s);
- Result result = null;
- while ((result = scanner.next()) != null) {
- statsForRegion.add(result);
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- return statsForRegion;
- }
-
public static GuidePostsInfo readStatistics(HTableInterface statsHTable, GuidePostsKey key, long clientTimeStamp)
throws IOException {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -168,8 +139,9 @@ public class StatisticsUtil {
s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES);
s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
- GuidePostsInfoBuilder guidePostsInfoWriter = new GuidePostsInfoBuilder();
+ GuidePostsInfoBuilder guidePostsInfoBuilder = new GuidePostsInfoBuilder();
Cell current = null;
+ GuidePostsInfo emptyGuidePost = null;
try (ResultScanner scanner = statsHTable.getScanner(s)) {
Result result = null;
while ((result = scanner.next()) != null) {
@@ -198,22 +170,24 @@ public class StatisticsUtil {
ptr.set(current.getRowArray(), cfOffset, cfLength);
byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
byte[] newGPStartKey = getGuidePostsInfoFromRowKey(tableNameBytes, cfName, result.getRow());
- guidePostsInfoWriter.addGuidePosts(newGPStartKey, byteCount, rowCount);
+ boolean isEmptyGuidePost = GuidePostsInfo.isEmptyGpsKey(newGPStartKey);
+ // Use the timestamp of the cell as the time at which guidepost was
+ // created/updated
+ long guidePostUpdateTime = current.getTimestamp();
+ if (isEmptyGuidePost) {
+ emptyGuidePost =
+ GuidePostsInfo.createEmptyGuidePost(byteCount, guidePostUpdateTime);
+ } else {
+ guidePostsInfoBuilder.trackGuidePost(
+ new ImmutableBytesWritable(newGPStartKey), byteCount, rowCount,
+ guidePostUpdateTime);
+ }
}
}
}
// We write a row with an empty KeyValue in the case that stats were generated but without enough data
// for any guideposts. If we have no rows, it means stats were never generated.
- return current == null ? GuidePostsInfo.NO_GUIDEPOST : guidePostsInfoWriter.isEmpty() ? GuidePostsInfo.EMPTY_GUIDEPOST : guidePostsInfoWriter.build();
- }
-
- private static SortedMap<byte[], GuidePostsInfo> getGuidePostsPerCf(
- TreeMap<byte[], GuidePostsInfoBuilder> guidePostsWriterPerCf) {
- TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
- for (byte[] key : guidePostsWriterPerCf.keySet()) {
- guidePostsPerCf.put(key, guidePostsWriterPerCf.get(key).build());
- }
- return guidePostsPerCf;
+ return current == null ? GuidePostsInfo.NO_GUIDEPOST : guidePostsInfoBuilder.isEmpty() ? emptyGuidePost : guidePostsInfoBuilder.build();
}
public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) {
@@ -230,8 +204,8 @@ public class StatisticsUtil {
return guidepostWidth;
}
}
-
- public static byte[] getGuidePostsInfoFromRowKey(byte[] tableNameBytes, byte[] fam, byte[] row) {
+
+ public static byte[] getGuidePostsInfoFromRowKey(byte[] tableNameBytes, byte[] fam, byte[] row) {
if (row.length > tableNameBytes.length + 1 + fam.length) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
int gpOffset = tableNameBytes.length + 1 + fam.length + 1;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index fb8d664..3ae3183 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema.stats;
+import static org.apache.phoenix.schema.stats.StatisticsUtil.getAdjustedKey;
+
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInput;
@@ -24,6 +26,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.sql.Date;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -33,6 +36,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -42,12 +47,15 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRo
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PrefixByteDecoder;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -68,16 +76,16 @@ public class StatisticsWriter implements Closeable {
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
- public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp)
+ public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, long guidePostDepth)
throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
- clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
}
HTableInterface statsWriterTable = env.getTable(
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, env.getConfiguration()));
HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName,
- clientTimeStamp);
+ clientTimeStamp, guidePostDepth);
return statsTable;
}
@@ -88,14 +96,15 @@ public class StatisticsWriter implements Closeable {
private final HTableInterface statsReaderTable;
private final byte[] tableName;
private final long clientTimeStamp;
- private final ImmutableBytesWritable minKeyPtr = new ImmutableBytesWritable();
-
+ private final long guidePostDepth;
+
private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName,
- long clientTimeStamp) {
+ long clientTimeStamp, long guidePostDepth) {
this.statsReaderTable = statsReaderTable;
this.statsWriterTable = statsWriterTable;
this.tableName = Bytes.toBytes(tableName);
this.clientTimeStamp = clientTimeStamp;
+ this.guidePostDepth = guidePostDepth;
}
/**
@@ -159,7 +168,17 @@ public class StatisticsWriter implements Closeable {
Delete delete = new Delete(rowKey, timeStamp);
mutations.add(delete);
} else {
- addGuidepost(cfKey, mutations, ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY, 0, 0, timeStamp);
+ /*
+ * When there is not enough data in the region, we create a guide post with empty
+ * key with the estimated amount of data in it as the guide post width. We can't
+ * determine the expected number of rows here since we don't have the PTable and the
+ * associated schema available to make the row size estimate. We instead will
+ * compute it on the client side when reading out guideposts from the SYSTEM.STATS
+ * table in StatisticsUtil#readStatistics(HTableInterface statsHTable,
+ * GuidePostsKey key, long clientTimeStamp).
+ */
+ addGuidepost(cfKey, mutations, ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY, guidePostDepth,
+ 0, timeStamp);
}
}
}
@@ -209,7 +228,7 @@ public class StatisticsWriter implements Closeable {
}
private Put getLastStatsUpdatedTimePut(long timeStamp) {
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
byte[] prefix = tableName;
Put put = new Put(prefix);
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES,
@@ -223,12 +242,25 @@ public class StatisticsWriter implements Closeable {
statsWriterTable.put(put);
}
- public void deleteStats(Region region, StatisticsCollector tracker, ImmutableBytesPtr fam, List<Mutation> mutations)
- throws IOException {
- long timeStamp = clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp()
- : clientTimeStamp;
- List<Result> statsForRegion = StatisticsUtil.readStatistics(statsWriterTable, tableName, fam,
- region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey(), timeStamp);
+ public void deleteStatsForRegion(Region region, StatisticsCollector tracker, ImmutableBytesPtr fam,
+ List<Mutation> mutations) throws IOException {
+ long timeStamp =
+ clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP
+ ? tracker.getMaxTimeStamp() : clientTimeStamp;
+ byte[] startKey = region.getRegionInfo().getStartKey();
+ byte[] stopKey = region.getRegionInfo().getEndKey();
+ List<Result> statsForRegion = new ArrayList<Result>();
+ Scan s =
+ MetaDataUtil.newTableRowsScan(getAdjustedKey(startKey, tableName, fam, false),
+ getAdjustedKey(stopKey, tableName, fam, true),
+ MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+ try (ResultScanner scanner = statsWriterTable.getScanner(s)) {
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ statsForRegion.add(result);
+ }
+ }
for (Result result : statsForRegion) {
mutations.add(new Delete(result.getRow(), timeStamp - 1));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
index d946870..bde049b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
@@ -20,11 +20,16 @@ package org.apache.phoenix.schema.tuple;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.util.KeyValueUtil;
-
+/**
+ * Tuple that can be used to represent a list of cells. It is imperative that the list of cells
+ * passed in are sorted using the {@link KeyValue#COMPARATOR} otherwise doing binary search to find
+ * a particular cell will fail.
+ */
public class MultiKeyValueTuple extends BaseTuple {
private List<Cell> values;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/NumberUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/NumberUtil.java
index 67b298f..646f502 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/NumberUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/NumberUtil.java
@@ -67,4 +67,22 @@ public class NumberUtil {
return num1 + num2;
}
}
+
+ /**
+ * @return If both are null, then return null. If one is null, return the other. Else, return
+ * minimum of the two.
+ */
+ public static Long getMin(Long num1, Long num2) {
+ if (num1 == null) {
+ if (num2 == null) {
+ return null;
+ }
+ return num2;
+ } else {
+ if (num2 == null) {
+ return num1;
+ }
+ return Math.min(num1, num2);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index a0fe9bb..59b3bb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -206,6 +206,13 @@ public class PhoenixRuntime {
PhoenixStatement.EXPLAIN_PLAN_ROWS_COLUMN_ALIAS;
/**
+ * Use this column name on the row returned by explain plan result set to get timestamp at which
+ * the estimate of number or bytes/rows was collected
+ */
+ public static final String EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN =
+ PhoenixStatement.EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_ALIAS;
+
+ /**
* All Phoenix specific connection properties
* TODO: use enum instead
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index 9ba7f6d..4cb0468 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.end2end.Shadower;
@@ -648,7 +649,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC"));
GuidePostsInfoBuilder gpWriter = new GuidePostsInfoBuilder();
for (byte[] gp : guidePosts) {
- gpWriter.addGuidePosts(gp, 1000);
+ gpWriter.trackGuidePost(new ImmutableBytesWritable(gp), 1000, 0, 0);
}
GuidePostsInfo info = gpWriter.build();
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 3980bc6..935d8cb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -481,6 +481,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public Long getEstimatedBytesToScan() {
return null;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return null;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null);
List<KeyRange> keyRanges = parallelIterators.getSplits();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
index 888f09a..dd96d8c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
@@ -115,7 +115,7 @@ public class StatisticsScannerTest {
@Test
public void testCheckRegionServerStoppingOnException() throws Exception {
StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
- doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+ doThrow(new IOException()).when(statsWriter).deleteStatsForRegion(any(Region.class), any(StatisticsCollector.class),
any(ImmutableBytesPtr.class), any(List.class));
when(rsServices.isStopping()).thenReturn(true);
when(rsServices.isStopped()).thenReturn(false);
@@ -130,7 +130,7 @@ public class StatisticsScannerTest {
@Test
public void testCheckRegionServerStoppedOnException() throws Exception {
StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
- doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+ doThrow(new IOException()).when(statsWriter).deleteStatsForRegion(any(Region.class), any(StatisticsCollector.class),
any(ImmutableBytesPtr.class), any(List.class));
when(rsServices.isStopping()).thenReturn(false);
when(rsServices.isStopped()).thenReturn(true);
[2/2] phoenix git commit: PHOENIX-4007 Surface time at which byte/row
estimate information was computed in explain plan output
Posted by sa...@apache.org.
PHOENIX-4007 Surface time at which byte/row estimate information was computed in explain plan output
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6d8357e9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6d8357e9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6d8357e9
Branch: refs/heads/master
Commit: 6d8357e9029e639de952d76493203e161e237adb
Parents: 176f541
Author: Samarth Jain <sa...@apache.org>
Authored: Tue Sep 26 19:21:00 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Tue Sep 26 19:21:00 2017 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/ArrayIT.java | 3 +-
.../end2end/BaseUniqueNamesOwnClusterIT.java | 1 -
.../end2end/ExplainPlanWithStatsDisabledIT.java | 64 ++-
.../end2end/ExplainPlanWithStatsEnabledIT.java | 413 ++++++++++++++++---
.../phoenix/end2end/ParallelStatsEnabledIT.java | 1 +
.../phoenix/end2end/QueryWithOffsetIT.java | 10 +-
.../phoenix/end2end/StatsCollectorIT.java | 45 +-
.../phoenix/compile/BaseMutationPlan.java | 5 +
.../phoenix/compile/DelegateMutationPlan.java | 5 +
.../apache/phoenix/compile/DeleteCompiler.java | 48 ++-
.../phoenix/compile/ListJarsQueryPlan.java | 5 +
.../apache/phoenix/compile/StatementPlan.java | 6 +
.../apache/phoenix/compile/TraceQueryPlan.java | 5 +
.../apache/phoenix/compile/UpsertCompiler.java | 15 +-
.../coprocessor/BaseScannerRegionObserver.java | 1 -
.../UngroupedAggregateRegionObserver.java | 14 +-
.../apache/phoenix/execute/AggregatePlan.java | 1 +
.../apache/phoenix/execute/BaseQueryPlan.java | 12 +
.../phoenix/execute/DelegateQueryPlan.java | 5 +
.../apache/phoenix/execute/HashJoinPlan.java | 30 +-
.../execute/LiteralResultIterationPlan.java | 5 +
.../org/apache/phoenix/execute/ScanPlan.java | 12 +-
.../phoenix/execute/SortMergeJoinPlan.java | 34 +-
.../org/apache/phoenix/execute/UnionPlan.java | 27 +-
.../phoenix/iterate/BaseResultIterators.java | 75 +++-
.../apache/phoenix/jdbc/PhoenixStatement.java | 53 ++-
.../stats/DefaultStatisticsCollector.java | 48 +--
.../phoenix/schema/stats/GuidePostsInfo.java | 59 ++-
.../schema/stats/GuidePostsInfoBuilder.java | 48 ++-
.../phoenix/schema/stats/StatisticsScanner.java | 2 +-
.../phoenix/schema/stats/StatisticsUtil.java | 62 +--
.../phoenix/schema/stats/StatisticsWriter.java | 60 ++-
.../schema/tuple/MultiKeyValueTuple.java | 7 +-
.../org/apache/phoenix/util/NumberUtil.java | 18 +
.../org/apache/phoenix/util/PhoenixRuntime.java | 7 +
.../phoenix/filter/SkipScanBigFilterTest.java | 3 +-
.../query/ParallelIteratorsSplitTest.java | 5 +
.../schema/stats/StatisticsScannerTest.java | 4 +-
38 files changed, 958 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
index d05a200..cf86614 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArrayIT.java
@@ -131,8 +131,7 @@ public class ArrayIT extends ParallelStatsDisabledIT {
String query = "SELECT a_double_array, /* comment ok? */ b_string, a_float FROM " + tableName + " WHERE ?=organization_id and ?=a_float";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
- //TODO: samarth do we need this
- analyzeTable(conn, tableName);
+ analyzeTable(conn, tableName);
try {
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseUniqueNamesOwnClusterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseUniqueNamesOwnClusterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseUniqueNamesOwnClusterIT.java
index 9401b2c..7ccbaaf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseUniqueNamesOwnClusterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseUniqueNamesOwnClusterIT.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.end2end;
import org.apache.phoenix.query.BaseTest;
-import org.junit.AfterClass;
import org.junit.experimental.categories.Category;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
index ff4127d..c2d9b52 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
@@ -20,12 +20,13 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.end2end.ExplainPlanWithStatsEnabledIT.getByteRowEstimates;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.List;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ExplainPlanWithStatsEnabledIT.Estimate;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -54,7 +55,7 @@ public class ExplainPlanWithStatsDisabledIT extends ParallelStatsDisabledIT {
private static void initData(String tableName) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + tableName
- + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k))");
+ + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH = 0");
conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
conn.createStatement().execute("upsert into " + tableName + " values (102,2,4)");
@@ -66,6 +67,9 @@ public class ExplainPlanWithStatsDisabledIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("upsert into " + tableName + " values (108,2,4)");
conn.createStatement().execute("upsert into " + tableName + " values (109,2,4)");
conn.commit();
+ // Because the guide post width is set to 0, no guide post will be collected
+ // effectively disabling stats collection.
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
}
}
@@ -167,9 +171,10 @@ public class ExplainPlanWithStatsDisabledIT extends ParallelStatsDisabledIT {
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 200l, info.getSecond());
- assertEquals((Long) 2l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 200l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertTrue(info.estimatedRows > 0);
}
}
@@ -183,30 +188,57 @@ public class ExplainPlanWithStatsDisabledIT extends ParallelStatsDisabledIT {
assertEstimatesAreZero(sql, binds, conn);
}
}
-
+
@Test
public void testBytesRowsForSelectExecutedSerially() throws Exception {
String sql = "SELECT * FROM " + tableA + " LIMIT 2";
List<Object> binds = Lists.newArrayList();
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 200l, info.getSecond());
- assertEquals((Long) 2l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 200l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertTrue(info.estimatedRows > 0);
+ }
+ }
+
+ @Test
+ public void testEstimatesForUnionWithTablesWithNullAndLargeGpWidth() throws Exception {
+ String tableWithLargeGPWidth = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // create a table with 1 MB guidepost width
+ long guidePostWidth = 1000000;
+ conn.createStatement()
+ .execute("CREATE TABLE " + tableWithLargeGPWidth
+ + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH="
+ + guidePostWidth);
+ conn.createStatement()
+ .execute("upsert into " + tableWithLargeGPWidth + " values (100,1,3)");
+ conn.createStatement()
+ .execute("upsert into " + tableWithLargeGPWidth + " values (101,2,4)");
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableWithLargeGPWidth);
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String sql =
+ "SELECT * FROM " + tableA + " UNION ALL SELECT * FROM " + tableWithLargeGPWidth;
+ assertEstimatesAreNull(sql, Lists.newArrayList(), conn);
}
}
- private void assertEstimatesAreNull(String sql, List<Object> binds, Connection conn)
+ public static void assertEstimatesAreNull(String sql, List<Object> binds, Connection conn)
throws Exception {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertNull(info.getSecond());
- assertNull(info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertNull(info.estimatedBytes);
+ assertNull(info.estimatedRows);
+ assertNull(info.estimateInfoTs);
}
private void assertEstimatesAreZero(String sql, List<Object> binds, Connection conn)
throws Exception {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 0l, info.getSecond());
- assertEquals((Long) 0l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.estimatedBytes);
+ assertEquals((Long) 0l, info.estimatedRows);
+ assertEquals((Long) 0l, info.estimateInfoTs);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 49a0485..cd4555c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -18,29 +18,29 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* This class has tests for asserting the bytes and rows information exposed in the explain plan
@@ -50,22 +50,37 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
private static String tableA;
private static String tableB;
+ private static String tableWithLargeGPWidth;
+ private static String indexOnA;
+ private static final long largeGpWidth = 2 * 1000 * 1000l;
@BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
- props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ public static void createTables() throws Exception {
tableA = generateUniqueName();
- initDataAndStats(tableA);
+ initDataAndStats(tableA, 20l);
tableB = generateUniqueName();
- initDataAndStats(tableB);
+ initDataAndStats(tableB, 20l);
+ tableWithLargeGPWidth = generateUniqueName();
+ initDataAndStats(tableWithLargeGPWidth, largeGpWidth);
+ indexOnA = generateUniqueName();
+ createIndex(indexOnA, tableA, 20);
}
- private static void initDataAndStats(String tableName) throws Exception {
+ private static void createIndex(String indexName, String table, long guidePostWidth)
+ throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + tableName
- + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k))");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + table
+ + " (c1.a) INCLUDE (c2.b) GUIDE_POSTS_WIDTH = " + guidePostWidth);
+ conn.createStatement().execute("UPDATE STATISTICS " + indexName);
+ }
+ }
+
+ private static void initDataAndStats(String tableName, Long guidePostWidth) throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement()
+ .execute("CREATE TABLE " + tableName
+ + " (k INTEGER PRIMARY KEY, c1.a bigint, c2.b bigint)"
+ + " GUIDE_POSTS_WIDTH=" + guidePostWidth);
conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
conn.createStatement().execute("upsert into " + tableName + " values (102,2,4)");
@@ -81,25 +96,73 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
}
}
+ private static Connection getTenantConnection(String tenantId) throws SQLException {
+ String url = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+ return DriverManager.getConnection(url);
+ }
+
@Test
- public void testBytesRowsForSelect() throws Exception {
+ public void testBytesRowsForSelectWhenKeyOutOfRange() throws Exception {
String sql = "SELECT * FROM " + tableA + " where k >= ?";
List<Object> binds = Lists.newArrayList();
+ binds.add(200);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.estimatedBytes);
+ assertEquals((Long) 0l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
+ }
+ }
+
+ @Test
+ public void testBytesRowsForSelectWhenKeyInRange() throws Exception {
+ String sql = "SELECT * FROM " + tableB + " where k >= ?";
+ List<Object> binds = Lists.newArrayList();
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 634l, info.getSecond());
- assertEquals((Long) 10l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 634l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
+ }
+ }
+
+ @Test
+ public void testBytesRowsForSelectOnIndex() throws Exception {
+ String sql = "SELECT * FROM " + tableA + " where c1.a >= ?";
+ List<Object> binds = Lists.newArrayList();
+ binds.add(0);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 691l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@Test
public void testBytesRowsForUnion() throws Exception {
- String sql = "SELECT * FROM " + tableA + " UNION ALL SELECT * FROM " + tableB;
+ String sql =
+ "SELECT /*+ NO_INDEX */ * FROM " + tableA + " UNION ALL SELECT * FROM " + tableB;
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, Lists.newArrayList());
- assertEquals((Long) (2 * 634l), info.getSecond());
- assertEquals((Long) (2 * 10l), info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, Lists.newArrayList());
+ assertEquals((Long) (2 * 634l), info.estimatedBytes);
+ assertEquals((Long) (2 * 10l), info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
+ }
+ }
+
+ @Test
+ public void testEstimatesForUnionWithTableWithLargeGpWidth() throws Exception {
+ // For table with largeGpWidth, a guide post is generated that has the
+ // byte size estimate of guide post width.
+ String sql =
+ "SELECT /*+ NO_INDEX */ * FROM " + tableA + " UNION ALL SELECT * FROM " + tableB
+ + " UNION ALL SELECT * FROM " + tableWithLargeGPWidth;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ Estimate info = getByteRowEstimates(conn, sql, Lists.newArrayList());
+ assertEquals((Long) (2 * 634 + largeGpWidth), info.estimatedBytes);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@@ -109,21 +172,23 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
"SELECT ta.c1.a, ta.c2.b FROM " + tableA + " ta JOIN " + tableB
+ " tb ON ta.k = tb.k";
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, Lists.newArrayList());
- assertEquals((Long) (634l), info.getSecond());
- assertEquals((Long) (10l), info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, Lists.newArrayList());
+ assertEquals((Long) (634l), info.estimatedBytes);
+ assertEquals((Long) (10l), info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@Test
public void testBytesRowsForSortMergeJoin() throws Exception {
String sql =
- "SELECT /*+ USE_SORT_MERGE_JOIN */ ta.c1.a, ta.c2.b FROM " + tableA + " ta JOIN "
- + tableB + " tb ON ta.k = tb.k";
+ "SELECT /*+ NO_INDEX USE_SORT_MERGE_JOIN */ ta.c1.a, ta.c2.b FROM " + tableA
+ + " ta JOIN " + tableB + " tb ON ta.k = tb.k";
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, Lists.newArrayList());
- assertEquals((Long) (2 * 634l), info.getSecond());
- assertEquals((Long) (2 * 10l), info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, Lists.newArrayList());
+ assertEquals((Long) (2 * 634l), info.estimatedBytes);
+ assertEquals((Long) (2 * 10l), info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@@ -133,33 +198,36 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
List<Object> binds = Lists.newArrayList();
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 634l, info.getSecond());
- assertEquals((Long) 10l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 634l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@Test
public void testBytesRowsForUpsertSelectServerSide() throws Exception {
- String sql = "UPSERT INTO " + tableA + " SELECT * FROM " + tableA;
+ String sql = "UPSERT INTO " + tableA + " SELECT * FROM " + tableB;
List<Object> binds = Lists.newArrayList();
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(true);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 634l, info.getSecond());
- assertEquals((Long) 10l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 634l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@Test
public void testBytesRowsForUpsertSelectClientSide() throws Exception {
- String sql = "UPSERT INTO " + tableA + " SELECT * FROM " + tableA;
+ String sql = "UPSERT INTO " + tableB + " SELECT * FROM " + tableB;
List<Object> binds = Lists.newArrayList();
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 634l, info.getSecond());
- assertEquals((Long) 10l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 634l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@@ -171,9 +239,10 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
binds.add(99);
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 0l, info.getSecond());
- assertEquals((Long) 0l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.estimatedBytes);
+ assertEquals((Long) 0l, info.estimatedRows);
+ assertEquals((Long) 0l, info.estimateInfoTs);
}
}
@@ -184,9 +253,10 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(true);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 634l, info.getSecond());
- assertEquals((Long) 10l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 634l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@@ -197,9 +267,10 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
binds.add(99);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 200l, info.getSecond());
- assertEquals((Long) 2l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 200l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
@@ -210,9 +281,10 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
binds.add(100);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 0l, info.getSecond());
- assertEquals((Long) 0l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.estimatedBytes);
+ assertEquals((Long) 0l, info.estimatedRows);
+ assertEquals((Long) 0l, info.estimateInfoTs);
}
}
@@ -222,17 +294,31 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
List<Object> binds = Lists.newArrayList();
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
- Pair<Long, Long> info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 200l, info.getSecond());
- assertEquals((Long) 2l, info.getFirst());
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 200l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertTrue(info.estimateInfoTs > 0);
}
}
- public static Pair<Long, Long> getByteRowEstimates(Connection conn, String sql,
- List<Object> bindValues) throws Exception {
+ public static class Estimate {
+ final Long estimatedBytes;
+ final Long estimatedRows;
+ final Long estimateInfoTs;
+
+ Estimate(Long rows, Long bytes, Long ts) {
+ this.estimatedBytes = bytes;
+ this.estimatedRows = rows;
+ this.estimateInfoTs = ts;
+ }
+ }
+
+ public static Estimate getByteRowEstimates(Connection conn, String sql, List<Object> bindValues)
+ throws Exception {
String explainSql = "EXPLAIN " + sql;
Long estimatedBytes = null;
Long estimatedRows = null;
+ Long estimateInfoTs = null;
try (PreparedStatement statement = conn.prepareStatement(explainSql)) {
int paramIdx = 1;
for (Object bind : bindValues) {
@@ -244,8 +330,10 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
(Long) rs.getObject(PhoenixRuntime.EXPLAIN_PLAN_ESTIMATED_BYTES_READ_COLUMN);
estimatedRows =
(Long) rs.getObject(PhoenixRuntime.EXPLAIN_PLAN_ESTIMATED_ROWS_READ_COLUMN);
+ estimateInfoTs =
+ (Long) rs.getObject(PhoenixRuntime.EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN);
}
- return new Pair<>(estimatedRows, estimatedBytes);
+ return new Estimate(estimatedRows, estimatedBytes, estimateInfoTs);
}
@Test
@@ -263,14 +351,17 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
table = generateUniqueName();
ddl = "CREATE TABLE " + table + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)";
conn.createStatement().execute(ddl);
- assertUseStatsForQueryFlag(table, conn.unwrap(PhoenixConnection.class), DEFAULT_USE_STATS_FOR_PARALLELIZATION);
+ assertUseStatsForQueryFlag(table, conn.unwrap(PhoenixConnection.class),
+ DEFAULT_USE_STATS_FOR_PARALLELIZATION);
}
}
private static void assertUseStatsForQueryFlag(String tableName, PhoenixConnection conn,
boolean flag) throws TableNotFoundException, SQLException {
- assertEquals(flag, conn.unwrap(PhoenixConnection.class).getMetaDataCache()
- .getTableRef(new PTableKey(null, tableName)).getTable().useStatsForParallelization());
+ assertEquals(flag,
+ conn.unwrap(PhoenixConnection.class).getMetaDataCache()
+ .getTableRef(new PTableKey(null, tableName)).getTable()
+ .useStatsForParallelization());
String query =
"SELECT USE_STATS_FOR_PARALLELIZATION FROM SYSTEM.CATALOG WHERE TABLE_NAME = ? AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL AND TENANT_ID IS NULL";
PreparedStatement stmt = conn.prepareStatement(query);
@@ -279,4 +370,200 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
rs.next();
assertEquals(flag, rs.getBoolean(1));
}
+
+ @Test
+ public void testBytesRowsForSelectOnTenantViews() throws Exception {
+ String tenant1View = generateUniqueName();
+ ;
+ String tenant2View = generateUniqueName();
+ ;
+ String tenant3View = generateUniqueName();
+ ;
+ String multiTenantBaseTable = generateUniqueName();
+ String tenant1 = "tenant1";
+ String tenant2 = "tenant2";
+ String tenant3 = "tenant3";
+ MyClock clock = new MyClock(1000);
+ createMultitenantTableAndViews(tenant1View, tenant2View, tenant3View, tenant1, tenant2,
+ tenant3, multiTenantBaseTable, clock);
+
+ // query the entire multitenant table
+ String sql = "SELECT * FROM " + multiTenantBaseTable + " WHERE ORGID >= ?";
+ List<Object> binds = Lists.newArrayList();
+ binds.add("tenant0");
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 817l, info.estimatedBytes);
+ assertEquals((Long) 10l, info.estimatedRows);
+ assertEquals((Long) clock.currentTime(), info.estimateInfoTs);
+ }
+ binds.clear();
+ // query tenant1 view
+ try (Connection conn = getTenantConnection(tenant1)) {
+ sql = "SELECT * FROM " + tenant1View;
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 143l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertEquals((Long) clock.currentTime(), info.estimateInfoTs);
+ }
+ // query tenant2 view
+ try (Connection conn = getTenantConnection(tenant2)) {
+ sql = "SELECT * FROM " + tenant2View;
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 143l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertEquals((Long) clock.currentTime(), info.estimateInfoTs);
+ }
+ // query tenant3 view
+ try (Connection conn = getTenantConnection(tenant3)) {
+ sql = "SELECT * FROM " + tenant3View;
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 531l, info.estimatedBytes);
+ assertEquals((Long) 6l, info.estimatedRows);
+ assertEquals((Long) clock.currentTime(), info.estimateInfoTs);
+ }
+ /*
+ * Now we will add some rows to tenant1view an run update stats on it. We will do this after
+ * advancing our clock by 1000 seconds. This way we can check that only the region for
+ * tenant1 will have updated guidepost with the new timestamp.
+ */
+ long prevTenant1Bytes = 143l;
+ long prevGuidePostTimestamp = clock.currentTime();
+ clock.advanceTime(1000);
+ try {
+ EnvironmentEdgeManager.injectEdge(clock);
+ // Update tenant1 view
+ try (Connection conn = getTenantConnection(tenant1)) {
+ // upsert a few rows for tenantView
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (11, 11, 11)");
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (12, 12, 12)");
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (13, 13, 13)");
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (14, 14, 14)");
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (15, 15, 15)");
+ conn.createStatement()
+ .executeUpdate("UPSERT INTO " + tenant1View + " VALUES (16, 16, 16)");
+ conn.commit();
+ // run update stats on the tenantView
+ conn.createStatement().executeUpdate("UPDATE STATISTICS " + tenant1View);
+ // get estimates now and check if they were updated as expected
+ sql = "SELECT * FROM " + tenant1View;
+ Estimate info = getByteRowEstimates(conn, sql, Collections.emptyList());
+ assertTrue(info.estimatedBytes > prevTenant1Bytes);
+ assertEquals((Long) 8l, info.estimatedRows);
+ assertEquals((Long) clock.currentTime(), info.estimateInfoTs);
+ }
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ // Now check estimates again for tenantView2 and tenantView3. They should stay the same.
+ try (Connection conn = getTenantConnection(tenant2)) {
+ sql = "SELECT * FROM " + tenant2View;
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 143l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
+ assertEquals((Long) prevGuidePostTimestamp, info.estimateInfoTs);
+ }
+ try (Connection conn = getTenantConnection(tenant3)) {
+ sql = "SELECT * FROM " + tenant3View;
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 531l, info.estimatedBytes);
+ assertEquals((Long) 6l, info.estimatedRows);
+ assertEquals((Long) prevGuidePostTimestamp, info.estimateInfoTs);
+ }
+ /*
+ * Now let's query the base table and see estimates. Because we use the minimum timestamp
+ * for all guideposts that we will be scanning, the timestamp for the estimate info for this
+ * query should be prevGuidePostTimestamp.
+ */
+ binds.clear();
+ binds.add("tenant0");
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ sql = "SELECT * FROM " + multiTenantBaseTable + " WHERE ORGID >= ?";
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 1399l, info.estimatedBytes);
+ assertEquals((Long) 16l, info.estimatedRows);
+ assertEquals((Long) prevGuidePostTimestamp, info.estimateInfoTs);
+ }
+ }
+
+ private static void createMultitenantTableAndViews(String tenant1View, String tenant2View,
+ String tenant3View, String tenant1, String tenant2, String tenant3,
+ String multiTenantTable, MyClock clock) throws SQLException {
+ byte[][] splits =
+ new byte[][] { Bytes.toBytes(tenant1), Bytes.toBytes(tenant2),
+ Bytes.toBytes(tenant3) };
+ String ddl =
+ "CREATE TABLE " + multiTenantTable
+ + " (orgId CHAR(15) NOT NULL, pk2 integer NOT NULL, c1.a bigint, c2.b bigint CONSTRAINT PK PRIMARY KEY "
+ + "(ORGID, PK2)) MULTI_TENANT=true, GUIDE_POSTS_WIDTH=2";
+ // Use our own clock to get rows created with our controlled timestamp
+ try {
+ EnvironmentEdgeManager.injectEdge(clock);
+ createTestTable(getUrl(), ddl, splits, null);
+ clock.advanceTime(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ /**
+ * Insert 2 rows each for tenant1 and tenant2 and 6 rows for tenant3
+ */
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant1 + "',1,1,1)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant1 + "',2,2,2)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant2 + "',3,3,3)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant2 + "',4,4,4)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',5,5,5)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',6,6,6)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',7,7,7)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',8,8,8)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',9,9,9)");
+ conn.createStatement().execute(
+ "upsert into " + multiTenantTable + " values ('" + tenant3 + "',10,10,10)");
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + multiTenantTable);
+ }
+ try (Connection conn = getTenantConnection(tenant1)) {
+ conn.createStatement().execute(
+ "CREATE VIEW " + tenant1View + " AS SELECT * FROM " + multiTenantTable);
+ }
+ try (Connection conn = getTenantConnection(tenant2)) {
+ conn.createStatement().execute(
+ "CREATE VIEW " + tenant2View + " AS SELECT * FROM " + multiTenantTable);
+ }
+ try (Connection conn = getTenantConnection(tenant3)) {
+ conn.createStatement().execute(
+ "CREATE VIEW " + tenant3View + " AS SELECT * FROM " + multiTenantTable);
+ }
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private static class MyClock extends EnvironmentEdge {
+ public volatile long time;
+
+ public MyClock(long time) {
+ this.time = time;
+ }
+
+ @Override
+ public long currentTime() {
+ return time;
+ }
+
+ public void advanceTime(long t) {
+ this.time += t;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
index a62d50d..d6b3924 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
@@ -41,6 +41,7 @@ public abstract class ParallelStatsEnabledIT extends BaseTest {
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index cab75b7..aff22af 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -34,7 +34,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Before;
@@ -162,14 +166,14 @@ public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
initTableValues(conn);
updateStatistics(conn);
ResultSet rs;
- rs = conn.createStatement()
- .executeQuery("SELECT t_id from " + tableName + " order by t_id offset " + offset + " row");
int i = 0;
+ rs =
+ conn.createStatement().executeQuery(
+ "SELECT t_id from " + tableName + " order by t_id offset " + offset + " row");
while (i++ < STRINGS.length - offset) {
assertTrue(rs.next());
assertEquals(STRINGS[offset + i - 1], rs.getString(1));
}
-
rs = conn.createStatement().executeQuery(
"SELECT k3, count(*) from " + tableName + " group by k3 order by k3 desc offset " + offset + " row");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index d2c8e6f..19b5275 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -43,7 +43,6 @@ import java.util.Random;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -58,8 +57,9 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -177,7 +177,7 @@ public class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
String explainPlan = QueryUtil.getExplainPlan(rs);
assertEquals(
- "CLIENT 1-CHUNK 0 ROWS 0 BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName + "\n" +
+ "CLIENT 1-CHUNK 0 ROWS 20 BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName + "\n" +
" SERVER FILTER BY FIRST KEY ONLY",
explainPlan);
conn.close();
@@ -197,7 +197,7 @@ public class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + fullTableName + " WHERE v2='foo'");
explainPlan = QueryUtil.getExplainPlan(rs);
// if we are using the ONE_CELL_PER_COLUMN_FAMILY storage scheme, we will have the single kv even though there are no values for col family v2
- String stats = columnEncoded && !mutable ? "4-CHUNK 1 ROWS 38 BYTES" : "3-CHUNK 0 ROWS 0 BYTES";
+ String stats = columnEncoded && !mutable ? "4-CHUNK 1 ROWS 38 BYTES" : "3-CHUNK 0 ROWS 20 BYTES";
assertEquals(
"CLIENT " + stats + " PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" +
" SERVER FILTER BY B.V2 = 'foo'\n" +
@@ -707,5 +707,42 @@ public class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
assertEquals("Number of expected rows in stats table after major compaction didn't match", numRows, rs.getInt(1));
}
}
+
+ @Test
+ public void testEmptyGuidePostGeneratedWhenDataSizeLessThanGPWidth() throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ long guidePostWidth = 20000000;
+ conn.createStatement()
+ .execute("CREATE TABLE " + tableName
+ + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH="
+ + guidePostWidth + ", SALT_BUCKETS = 4");
+ conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
+ conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ ConnectionQueryServices queryServices =
+ conn.unwrap(PhoenixConnection.class).getQueryServices();
+ try (HTableInterface statsHTable =
+ queryServices.getTable(
+ SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
+ queryServices.getProps()).getName())) {
+ GuidePostsInfo gps =
+ StatisticsUtil.readStatistics(statsHTable,
+ new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C1")),
+ HConstants.LATEST_TIMESTAMP);
+ assertTrue(gps.isEmptyGuidePost());
+ assertEquals(guidePostWidth, gps.getByteCounts()[0]);
+ assertTrue(gps.getGuidePostTimestamps()[0] > 0);
+ gps =
+ StatisticsUtil.readStatistics(statsHTable,
+ new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C2")),
+ HConstants.LATEST_TIMESTAMP);
+ assertTrue(gps.isEmptyGuidePost());
+ assertEquals(guidePostWidth, gps.getByteCounts()[0]);
+ assertTrue(gps.getGuidePostTimestamps()[0] > 0);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
index 276dc9b..0e45682 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
@@ -74,4 +74,9 @@ public abstract class BaseMutationPlan implements MutationPlan {
return 0l;
}
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
index 005ae1f..343ec32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -77,4 +77,9 @@ public class DelegateMutationPlan implements MutationPlan {
return plan.getEstimatedBytesToScan();
}
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return plan.getEstimateInfoTimestamp();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index b2fd17c..be07cf4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -309,6 +309,13 @@ public class DeleteCompiler {
public Long getEstimatedRowsToScan() throws SQLException {
Long estRows = null;
for (MutationPlan plan : plans) {
+ /*
+ * If any of the plan doesn't have estimate information available, then we cannot
+ * provide estimate for the overall plan.
+ */
+ if (plan.getEstimatedRowsToScan() == null) {
+ return null;
+ }
estRows = add(estRows, plan.getEstimatedRowsToScan());
}
return estRows;
@@ -318,12 +325,36 @@ public class DeleteCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
Long estBytes = null;
for (MutationPlan plan : plans) {
+ /*
+ * If any of the plan doesn't have estimate information available, then we cannot
+ * provide estimate for the overall plan.
+ */
+ if (plan.getEstimatedBytesToScan() == null) {
+ return null;
+ }
estBytes = add(estBytes, plan.getEstimatedBytesToScan());
}
return estBytes;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ Long estInfoTimestamp = Long.MAX_VALUE;
+ for (MutationPlan plan : plans) {
+ Long timestamp = plan.getEstimateInfoTimestamp();
+ /*
+ * If any of the plan doesn't have estimate information available, then we cannot
+ * provide estimate for the overall plan.
+ */
+ if (timestamp == null) {
+ return timestamp;
+ }
+ estInfoTimestamp = Math.min(estInfoTimestamp, timestamp);
+ }
+ return estInfoTimestamp;
+ }
}
-
+
private static boolean hasNonPKIndexedColumns(Collection<PTable> immutableIndexes) {
for (PTable index : immutableIndexes) {
for (PColumn column : index.getPKColumns()) {
@@ -562,6 +593,11 @@ public class DeleteCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
return 0l;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
});
} else if (runOnServer) {
// TODO: better abstraction
@@ -659,6 +695,11 @@ public class DeleteCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
return aggPlan.getEstimatedBytesToScan();
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return aggPlan.getEstimateInfoTimestamp();
+ }
});
} else {
List<TableRef> immutableIndexRefsToBe = Lists.newArrayListWithExpectedSize(dataPlan.getTableRef().getTable().getIndexes().size());
@@ -746,6 +787,11 @@ public class DeleteCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
return plan.getEstimatedBytesToScan();
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return plan.getEstimateInfoTimestamp();
+ }
});
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index c6d03c1..839e7c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -264,4 +264,9 @@ public class ListJarsQueryPlan implements QueryPlan {
public Long getEstimatedBytesToScan() {
return 0l;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
index 6d381d9..c74b1c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
@@ -48,4 +48,10 @@ public interface StatementPlan {
* Returns null if the estimate cannot be provided.
*/
public Long getEstimatedBytesToScan() throws SQLException;
+
+ /**
+ * @return timestamp at which the estimate information (estimated bytes and estimated rows) was
+ * computed. executed. Returns null if the information cannot be provided.
+ */
+ public Long getEstimateInfoTimestamp() throws SQLException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index c061ec6..62e6991 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -272,4 +272,9 @@ public class TraceQueryPlan implements QueryPlan {
public Long getEstimatedBytesToScan() {
return 0l;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index c384292..6f45e28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -764,6 +764,11 @@ public class UpsertCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
return aggPlan.getEstimatedBytesToScan();
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return aggPlan.getEstimateInfoTimestamp();
+ }
};
}
}
@@ -846,7 +851,11 @@ public class UpsertCompiler {
public Long getEstimatedBytesToScan() throws SQLException {
return queryPlan.getEstimatedBytesToScan();
}
-
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return queryPlan.getEstimateInfoTimestamp();
+ }
};
}
@@ -1086,6 +1095,10 @@ public class UpsertCompiler {
return 0l;
}
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 83cc24e..34361ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a2a1b5c..582e606 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -905,7 +905,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
InternalScanner internalScanner = scanner;
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
- long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
store.getFamily().getName());
@@ -1163,7 +1163,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* Package private for tests.
*/
static class StatsCollectionCallable implements Callable<Long> {
- private final StatisticsCollector stats;
+ private final StatisticsCollector statsCollector;
private final Region region;
private final RegionScanner innerScanner;
private final Configuration config;
@@ -1171,7 +1171,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs,
Configuration config, Scan scan) {
- this.stats = s;
+ this.statsCollector = s;
this.region = r;
this.innerScanner = rs;
this.config = config;
@@ -1197,12 +1197,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
long rowCount = 0;
try {
if (!compactionRunning) {
- stats.init();
+ statsCollector.init();
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(results);
- stats.collectStatistics(results);
+ statsCollector.collectStatistics(results);
rowCount++;
compactionRunning = areStatsBeingCollectedViaCompaction();
} while (hasMore && !compactionRunning);
@@ -1216,7 +1216,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} finally {
try {
if (noErrors && !compactionRunning) {
- stats.updateStatistic(region, scan);
+ statsCollector.updateStatistic(region, scan);
logger.info("UPDATE STATISTICS finished successfully for scanner: "
+ innerScanner + ". Number of rows scanned: " + rowCount
+ ". Time: " + (System.currentTimeMillis() - startTime));
@@ -1228,7 +1228,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} finally {
try {
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo());
- stats.close();
+ statsCollector.close();
} finally {
try {
innerScanner.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 74c8d39..4c29abe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -227,6 +227,7 @@ public class AggregatePlan extends BaseQueryPlan {
: new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches);
estimatedRows = iterators.getEstimatedRowCount();
estimatedSize = iterators.getEstimatedByteCount();
+ estimateInfoTimestamp = iterators.getEstimateInfoTimestamp();
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 238a537..c1ddd44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -114,6 +114,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
protected final Expression dynamicFilter;
protected Long estimatedRows;
protected Long estimatedSize;
+ protected Long estimateInfoTimestamp;
private boolean explainPlanCalled;
@@ -503,6 +504,9 @@ public abstract class BaseQueryPlan implements QueryPlan {
// Optimize here when getting explain plan, as queries don't get optimized until after compilation
QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
+ this.estimatedRows = plan.getEstimatedRowsToScan();
+ this.estimatedSize = plan.getEstimatedBytesToScan();
+ this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
return exp;
}
@@ -533,4 +537,12 @@ public abstract class BaseQueryPlan implements QueryPlan {
return estimatedSize;
}
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ if (!explainPlanCalled) {
+ getExplainPlan();
+ }
+ return estimateInfoTimestamp;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index cde1410..3c62c5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -151,4 +151,9 @@ public abstract class DelegateQueryPlan implements QueryPlan {
public Long getEstimatedBytesToScan() throws SQLException {
return delegate.getEstimatedBytesToScan();
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return delegate.getEstimateInfoTimestamp();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 879aa61..2b90dcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import static org.apache.phoenix.util.NumberUtil.add;
+import static org.apache.phoenix.util.NumberUtil.getMin;
import java.sql.SQLException;
import java.util.Collections;
@@ -96,6 +97,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
private List<Expression> keyRangeExpressions;
private Long estimatedRows;
private Long estimatedBytes;
+ private Long estimateInfoTs;
private boolean explainPlanCalled;
public static HashJoinPlan create(SelectStatement statement,
@@ -261,8 +263,24 @@ public class HashJoinPlan extends DelegateQueryPlan {
planSteps.add(" JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
}
for (SubPlan subPlan : subPlans) {
- estimatedBytes = add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
- estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
+ if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
+ || subPlan.getInnerPlan().getEstimatedRowsToScan() == null
+ || subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ break;
+ } else {
+ estimatedBytes =
+ add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
+ estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
+ estimateInfoTs =
+ getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
+ }
}
return new ExplainPlan(planSteps);
}
@@ -486,6 +504,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
}
return estimatedBytes;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ if (!explainPlanCalled) {
+ getExplainPlan();
+ }
+ return estimateInfoTs;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 781c07e..86f59c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -125,4 +125,9 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
public Long getEstimatedBytesToScan() {
return 0l;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index f5b1af0..1e1cb0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -88,6 +89,7 @@ public class ScanPlan extends BaseQueryPlan {
private boolean isDataToScanWithinThreshold;
private Long serialRowsEstimate;
private Long serialBytesEstimate;
+ private Long serialEstimateInfoTs;
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
@@ -114,6 +116,7 @@ public class ScanPlan extends BaseQueryPlan {
if (isSerial) {
serialBytesEstimate = estimate.getFirst();
serialRowsEstimate = estimate.getSecond();
+ serialEstimateInfoTs = EnvironmentEdgeManager.currentTimeMillis();
}
}
@@ -240,6 +243,7 @@ public class ScanPlan extends BaseQueryPlan {
}
estimatedRows = iterators.getEstimatedRowCount();
estimatedSize = iterators.getEstimatedByteCount();
+ estimateInfoTimestamp = iterators.getEstimateInfoTimestamp();
splits = iterators.getSplits();
scans = iterators.getScans();
if (isOffsetOnServer) {
@@ -302,5 +306,11 @@ public class ScanPlan extends BaseQueryPlan {
return super.getEstimatedBytesToScan();
}
-
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ if (isSerial) {
+ return serialEstimateInfoTs;
+ }
+ return super.getEstimateInfoTimestamp();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 568094a..fab7c59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import static org.apache.phoenix.util.NumberUtil.add;
+import static org.apache.phoenix.util.NumberUtil.getMin;
import java.io.IOException;
import java.nio.MappedByteBuffer;
@@ -92,6 +93,7 @@ public class SortMergeJoinPlan implements QueryPlan {
private final int thresholdBytes;
private Long estimatedBytes;
private Long estimatedRows;
+ private Long estimateInfoTs;
private boolean explainPlanCalled;
public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table,
@@ -164,8 +166,28 @@ public class SortMergeJoinPlan implements QueryPlan {
for (String step : rhsPlan.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
- estimatedBytes = add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()), rhsPlan.getEstimatedBytesToScan());
- estimatedRows = add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()), rhsPlan.getEstimatedRowsToScan());
+ if ((lhsPlan.getEstimatedBytesToScan() == null || rhsPlan.getEstimatedBytesToScan() == null)
+ || (lhsPlan.getEstimatedRowsToScan() == null
+ || rhsPlan.getEstimatedRowsToScan() == null)
+ || (lhsPlan.getEstimateInfoTimestamp() == null
+ || rhsPlan.getEstimateInfoTimestamp() == null)) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ } else {
+ estimatedBytes =
+ add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()),
+ rhsPlan.getEstimatedBytesToScan());
+ estimatedRows =
+ add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()),
+ rhsPlan.getEstimatedRowsToScan());
+ estimateInfoTs =
+ getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
+ }
return new ExplainPlan(steps);
}
@@ -727,4 +749,12 @@ public class SortMergeJoinPlan implements QueryPlan {
}
return estimatedBytes;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ if (!explainPlanCalled) {
+ getExplainPlan();
+ }
+ return estimateInfoTs;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d8357e9/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index fd50a83..e06522f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import static org.apache.phoenix.util.NumberUtil.add;
+import static org.apache.phoenix.util.NumberUtil.getMin;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
@@ -66,6 +67,7 @@ public class UnionPlan implements QueryPlan {
private UnionResultIterators iterators;
private Long estimatedRows;
private Long estimatedBytes;
+ private Long estimateInfoTs;
private boolean explainPlanCalled;
public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
@@ -182,8 +184,21 @@ public class UnionPlan implements QueryPlan {
steps.set(i, " " + steps.get(i));
}
for (QueryPlan plan : plans) {
- estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
- estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
+ if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null
+ || plan.getEstimateInfoTimestamp() == null) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ break;
+ } else {
+ estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
+ estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
+ estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp());
+ }
}
return new ExplainPlan(steps);
}
@@ -253,4 +268,12 @@ public class UnionPlan implements QueryPlan {
}
return estimatedBytes;
}
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ if (!explainPlanCalled) {
+ getExplainPlan();
+ }
+ return estimateInfoTs;
+ }
}