You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 02:17:07 UTC
[2/7] phoenix git commit: PHOENIX-1432 Run limit query that has only
leading PK column filter serially
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 7905d34..bde3f78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,68 +17,23 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
-
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.stats.GuidePostsInfo;
-import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -90,537 +45,43 @@ import com.google.common.collect.Lists;
*
* @since 0.1
*/
-public class ParallelIterators extends ExplainTable implements ResultIterators {
+public class ParallelIterators extends BaseResultIterators {
private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
- private final List<List<Scan>> scans;
- private final List<KeyRange> splits;
- private final PTableStats tableStats;
- private final byte[] physicalTableName;
- private final QueryPlan plan;
+ private static final String NAME = "PARALLEL";
private final ParallelIteratorFactory iteratorFactory;
- public static interface ParallelIteratorFactory {
- PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
- }
-
- private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
- private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
-
- static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
- @Override
- public KeyRange apply(HRegionLocation region) {
- return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
- }
- };
-
- private PTable getTable() {
- return plan.getTableRef().getTable();
- }
-
- private boolean useStats() {
- Scan scan = context.getScan();
- boolean isPointLookup = context.getScanRanges().isPointLookup();
- /*
- * Don't use guide posts if:
- * 1) We're doing a point lookup, as HBase is fast enough at those
- * to not need them to be further parallelized. TODO: pref test to verify
- * 2) We're collecting stats, as in this case we need to scan entire
- * regions worth of data to track where to put the guide posts.
- */
- if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
- return false;
- }
- return true;
- }
-
public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
throws SQLException {
- super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
- this.plan = plan;
- StatementContext context = plan.getContext();
- TableRef tableRef = plan.getTableRef();
- PTable table = tableRef.getTable();
- FilterableStatement statement = plan.getStatement();
- RowProjector projector = plan.getProjector();
- physicalTableName = table.getPhysicalName().getBytes();
- tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
- Scan scan = context.getScan();
- if (projector.isProjectEmptyKeyValue()) {
- Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
- // If nothing projected into scan and we only have one column family, just allow everything
- // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
- // be quite a bit faster.
- // Where condition columns also will get added into familyMap
- // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
- if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
- && table.getColumnFamilies().size() == 1) {
- // Project the one column family. We must project a column family since it's possible
- // that there are other non declared column families that we need to ignore.
- scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
- ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
- } else {
- byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
- // Project empty key value unless the column family containing it has
- // been projected in its entirety.
- if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
- scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
- }
- }
- } else if (table.getViewType() == ViewType.MAPPED) {
- // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
- // selected column values are returned back to client
- for (PColumnFamily family : table.getColumnFamilies()) {
- scan.addFamily(family.getName().getBytes());
- }
- }
-
- // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
- if (perScanLimit != null) {
- ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
- }
-
- doColumnProjectionOptimization(context, scan, table, statement);
-
+ super(plan, perScanLimit);
this.iteratorFactory = iteratorFactory;
- this.scans = getParallelScans();
- List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
- for (List<Scan> scanList : scans) {
- for (Scan aScan : scanList) {
- splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
- }
- }
- this.splits = ImmutableList.copyOf(splitRanges);
- }
-
- private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
- Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
- if (familyMap != null && !familyMap.isEmpty()) {
- // columnsTracker contain cf -> qualifiers which should get returned.
- Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
- new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
- Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- int referencedCfCount = familyMap.size();
- for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
- if (!(familyMap.containsKey(whereCol.getFirst()))) {
- referencedCfCount++;
- }
- }
- boolean useOptimization;
- if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
- // Do not use the optimization
- useOptimization = false;
- } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
- // Strictly use the optimization
- useOptimization = true;
- } else {
- // when referencedCfCount is >1 and no Hints, we are not using the optimization
- useOptimization = referencedCfCount == 1;
- }
- if (useOptimization) {
- for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
- ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
- NavigableSet<byte[]> qs = entry.getValue();
- NavigableSet<ImmutableBytesPtr> cols = null;
- if (qs != null) {
- cols = new TreeSet<ImmutableBytesPtr>();
- for (byte[] q : qs) {
- cols.add(new ImmutableBytesPtr(q));
- }
- }
- columnsTracker.put(cf, cols);
- }
- }
- // Making sure that where condition CFs are getting scanned at HRS.
- for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
- if (useOptimization) {
- if (!(familyMap.containsKey(whereCol.getFirst()))) {
- scan.addFamily(whereCol.getFirst());
- conditionOnlyCfs.add(whereCol.getFirst());
- }
- } else {
- if (familyMap.containsKey(whereCol.getFirst())) {
- // where column's CF is present. If there are some specific columns added against this CF, we
- // need to ensure this where column also getting added in it.
- // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
- // specifically add the where column. Adding that will remove the cf1.* stuff and only this
- // where condition column will get returned!
- NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
- // cols is null means the whole CF will get scanned.
- if (cols != null) {
- scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
- }
- } else {
- // where column's CF itself is not present in family map. We need to add the column
- scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
- }
- }
- }
- if (useOptimization && !columnsTracker.isEmpty()) {
- for (ImmutableBytesPtr f : columnsTracker.keySet()) {
- // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
- // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
- scan.addFamily(f.get());
- }
- // We don't need this filter for aggregates, as we're not returning back what's
- // in the scan in this case. We still want the other optimization that causes
- // the ExplicitColumnTracker not to be used, though.
- if (!(statement.isAggregate())) {
- ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
- columnsTracker, conditionOnlyCfs));
- }
- }
- }
- }
-
- public List<KeyRange> getSplits() {
- return splits;
- }
-
- public List<List<Scan>> getScans() {
- return scans;
- }
-
- private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
- int nBoundaries = regionLocations.size() - 1;
- List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
- for (int i = 0; i < nBoundaries; i++) {
- HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
- ranges.add(regionInfo.getEndKey());
- }
- return ranges;
- }
-
- private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
- int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
- // If we found an exact match, return the index+1, as the inclusiveKey will be contained
- // in the next region (since we're matching on the end boundary).
- guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
- return guideIndex;
- }
-
- private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
- int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
- // If we found an exact match, return the index we found as the exclusiveKey won't be
- // contained in the next region as with getIndexContainingInclusive.
- guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
- return guideIndex;
- }
-
- private List<byte[]> getGuidePosts() {
- /*
- * Don't use guide posts if:
- * 1) We're doing a point lookup, as HBase is fast enough at those
- * to not need them to be further parallelized. TODO: pref test to verify
- * 2) We're collecting stats, as in this case we need to scan entire
- * regions worth of data to track where to put the guide posts.
- */
- if (!useStats()) {
- return Collections.emptyList();
- }
-
- List<byte[]> gps = null;
- PTable table = getTable();
- Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
- byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
- if (table.getColumnFamilies().isEmpty()) {
- // For sure we can get the defaultCF from the table
- if (guidePostMap.get(defaultCF) != null) {
- gps = guidePostMap.get(defaultCF).getGuidePosts();
- }
- } else {
- Scan scan = context.getScan();
- if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
- // If default CF is not used in scan, use first CF referenced in scan
- GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
- if (guidePostsInfo != null) {
- gps = guidePostsInfo.getGuidePosts();
- }
- } else {
- // Otherwise, favor use of default CF.
- if (guidePostMap.get(defaultCF) != null) {
- gps = guidePostMap.get(defaultCF).getGuidePosts();
- }
- }
- }
- if (gps == null) {
- return Collections.emptyList();
- }
- return gps;
- }
-
- private static String toString(List<byte[]> gps) {
- StringBuilder buf = new StringBuilder(gps.size() * 100);
- buf.append("[");
- for (int i = 0; i < gps.size(); i++) {
- buf.append(Bytes.toStringBinary(gps.get(i)));
- buf.append(",");
- if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
- buf.append("\n");
- }
- }
- buf.setCharAt(buf.length()-1, ']');
- return buf.toString();
- }
-
- private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
- PTable table = getTable();
- boolean startNewScanList = false;
- if (!plan.isRowKeyOrdered()) {
- startNewScanList = true;
- } else if (crossedRegionBoundary) {
- if (table.getIndexType() == IndexType.LOCAL) {
- startNewScanList = true;
- } else if (table.getBucketNum() != null) {
- startNewScanList = scans.isEmpty() ||
- ScanUtil.crossesPrefixBoundary(startKey,
- ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
- SaltingUtil.NUM_SALTING_BYTES);
- }
- }
- if (scan != null) {
- scans.add(scan);
- }
- if (startNewScanList && !scans.isEmpty()) {
- parallelScans.add(scans);
- scans = Lists.newArrayListWithExpectedSize(1);
- }
- return scans;
- }
-
- private List<List<Scan>> getParallelScans() throws SQLException {
- return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
}
- /**
- * Compute the list of parallel scans to run for a given query. The inner scans
- * may be concatenated together directly, while the other ones may need to be
- * merge sorted, depending on the query.
- * @return list of parallel scans to run for a given query.
- * @throws SQLException
- */
- private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
- Scan scan = context.getScan();
- List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
- .getAllTableRegions(physicalTableName);
-
- List<byte[]> regionBoundaries = toBoundaries(regionLocations);
- ScanRanges scanRanges = context.getScanRanges();
- PTable table = getTable();
- boolean isSalted = table.getBucketNum() != null;
- boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
- List<byte[]> gps = getGuidePosts();
- if (logger.isDebugEnabled()) {
- logger.debug("Guideposts: " + toString(gps));
- }
- boolean traverseAllRegions = isSalted || isLocalIndex;
- if (!traverseAllRegions) {
- byte[] scanStartRow = scan.getStartRow();
- if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
- startKey = scanStartRow;
- }
- byte[] scanStopRow = scan.getStopRow();
- if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
- stopKey = scanStopRow;
- }
- }
-
- int regionIndex = 0;
- int stopIndex = regionBoundaries.size();
- if (startKey.length > 0) {
- regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
- }
- if (stopKey.length > 0) {
- stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
- if (isLocalIndex) {
- stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
- }
- }
- List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
-
- byte[] currentKey = startKey;
- int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
- int gpsSize = gps.size();
- int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
- int keyOffset = 0;
- List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
- // Merge bisect with guideposts for all but the last region
- while (regionIndex <= stopIndex) {
- byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
- if (regionIndex == stopIndex) {
- endKey = stopKey;
- } else {
- endKey = regionBoundaries.get(regionIndex);
- }
- if (isLocalIndex) {
- HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
- endRegionKey = regionInfo.getEndKey();
- keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
- }
- while (guideIndex < gpsSize
- && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
- Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
- scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
- currentKey = currentGuidePost;
- guideIndex++;
- }
- Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
- if (isLocalIndex) {
- if (newScan != null) {
- newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
- } else if (!scans.isEmpty()) {
- scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
- }
- }
- scans = addNewScan(parallelScans, scans, newScan, endKey, true);
- currentKey = endKey;
- regionIndex++;
- }
- if (!scans.isEmpty()) { // Add any remaining scans
- parallelScans.add(scans);
- }
- if (logger.isDebugEnabled()) {
- logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
- ScanUtil.getCustomAnnotations(scan)));
- }
- return parallelScans;
- }
-
- public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
- if (!reverse) {
- return list;
- }
- return Lists.reverse(list);
- }
-
- private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
- if (!concatIterators.isEmpty()) {
- iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
- }
- }
- /**
- * Executes the scan in parallel across all regions, blocking until all scans are complete.
- * @return the result iterators for the scan of each region
- */
@Override
- public List<PeekingResultIterator> getIterators() throws SQLException {
- boolean success = false;
- boolean isReverse = ScanUtil.isReversed(context.getScan());
- boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
- final ConnectionQueryServices services = context.getConnection().getQueryServices();
- ReadOnlyProps props = services.getProps();
- int numSplits = size();
- List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
- List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
- // TODO: what purpose does this scanID serve?
- final UUID scanId = UUID.randomUUID();
- try {
- submitWork(scanId, scans, futures, splits.size());
- int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
- boolean clearedCache = false;
- for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
- List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
- for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
- try {
- PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
- concatIterators.add(iterator);
- } catch (ExecutionException e) {
- try { // Rethrow as SQLException
- throw ServerUtil.parseServerException(e);
- } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
- List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
- if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
- services.clearTableRegionCache(physicalTableName);
- clearedCache = true;
- }
- // Resubmit just this portion of work again
- Scan oldScan = scanPair.getFirst();
- byte[] startKey = oldScan.getStartRow();
- byte[] endKey = oldScan.getStopRow();
- if (isLocalIndex) {
- endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
- }
- List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
- // Add any concatIterators that were successful so far
- // as we need these to be in order
- addConcatResultIterator(iterators, concatIterators);
- concatIterators = Collections.emptyList();
- submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
- for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
- for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
- // Immediate do a get (not catching exception again) and then add the iterators we
- // get back immediately. They'll be sorted as expected, since they're replacing the
- // original one.
- PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
- iterators.add(iterator);
- }
- }
- }
- }
- }
- addConcatResultIterator(iterators, concatIterators);
- }
-
- success = true;
- return iterators;
- } catch (SQLException e) {
- throw e;
- } catch (Exception e) {
- throw ServerUtil.parseServerException(e);
- } finally {
- if (!success) {
- SQLCloseables.closeAllQuietly(iterators);
- // Don't call cancel, as it causes the HConnection to get into a funk
-// for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
-// future.getSecond().cancel(true);
-// }
- }
- }
- }
-
- private static final class ScanLocation {
- private final int outerListIndex;
- private final int innerListIndex;
- private final Scan scan;
- public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
- this.outerListIndex = outerListIndex;
- this.innerListIndex = innerListIndex;
- this.scan = scan;
- }
- public int getOuterListIndex() {
- return outerListIndex;
- }
- public int getInnerListIndex() {
- return innerListIndex;
- }
- public Scan getScan() {
- return scan;
- }
- }
- private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+ protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
- final ConnectionQueryServices services = context.getConnection().getQueryServices();
- ExecutorService executor = services.getExecutor();
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
// will spray the scans across machines as opposed to targeting a
// single one since the scans are in row key order.
- List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+ ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+ List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
for (int i = 0; i < nestedScans.size(); i++) {
List<Scan> scans = nestedScans.get(i);
List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
nestedFutures.add(futures);
for (int j = 0; j < scans.size(); j++) {
Scan scan = nestedScans.get(i).get(j);
- scanLocations.add(new ScanLocation(scan, i, j));
+ scanLocations.add(new ScanLocator(scan, i, j));
futures.add(null); // placeholder
}
}
+ // Shuffle so that we start execution across many machines
+ // before we fill up the thread pool
Collections.shuffle(scanLocations);
- for (ScanLocation scanLocation : scanLocations) {
+ for (ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
- Future<PeekingResultIterator> future =
- executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+ Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
@Override
public PeekingResultIterator call() throws Exception {
@@ -649,22 +110,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
@Override
- public int size() {
- return this.scans.size();
- }
-
- @Override
- public void explain(List<String> planSteps) {
- boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
- QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
- QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
- StringBuilder buf = new StringBuilder();
- buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + "PARALLEL " + size() + "-WAY ");
- explain(buf.toString(),planSteps);
+ protected String getName() {
+ return NAME;
}
-
- @Override
- public String toString() {
- return "ParallelIterators [scans=" + scans + "]";
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index 3051608..ef2b534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -20,8 +20,13 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
public interface ResultIterators {
- public List<PeekingResultIterator> getIterators() throws SQLException;
public int size();
+ public List<KeyRange> getSplits();
+ public List<List<Scan>> getScans();
public void explain(List<String> planSteps);
+ public List<PeekingResultIterator> getIterators() throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
new file mode 100644
index 0000000..5cb64a0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ *
+ * @since 0.1
+ */
+public class SerialIterators extends BaseResultIterators {
+ private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
+ private static final String NAME = "SERIAL";
+ private final ParallelIteratorFactory iteratorFactory;
+ private final int limit;
+
+ public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+ throws SQLException {
+ super(plan, perScanLimit);
+ Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
+ this.iteratorFactory = iteratorFactory;
+ this.limit = perScanLimit;
+ }
+
+ @Override
+ protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
+ // Pre-populate nestedFutures lists so that we can shuffle the scans
+ // and add the future to the right nested list. By shuffling the scans
+ // we get better utilization of the cluster since our thread executor
+ // will spray the scans across machines as opposed to targeting a
+ // single one since the scans are in row key order.
+ ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+
+ for (final List<Scan> scans : nestedScans) {
+ Scan firstScan = scans.get(0);
+ Scan lastScan = scans.get(scans.size()-1);
+ final Scan overallScan = ScanUtil.newScan(firstScan);
+ overallScan.setStopRow(lastScan.getStopRow());
+ Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+
+ @Override
+ public PeekingResultIterator call() throws Exception {
+ List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
+ for (final Scan scan : scans) {
+ long startTime = System.currentTimeMillis();
+ ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+ if (logger.isDebugEnabled()) {
+ logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
+ }
+ concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+ }
+ PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
+ return new LimitingPeekingResultIterator(concatIterator, limit);
+ }
+
+ /**
+ * Defines the grouping for round robin behavior. All threads spawned to process
+ * this scan will be grouped together and time sliced with other simultaneously
+ * executing parallel scans.
+ */
+ @Override
+ public Object getJobId() {
+ return SerialIterators.this;
+ }
+ }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+ // Add our singleton Future which will execute serially
+ nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
+ }
+ }
+
+ @Override
+ protected String getName() {
+ return NAME;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 2a5080e..a343b48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryServices;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6a68df3..ed1ef70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.compile.IndexStatementRewriter;
import org.apache.phoenix.compile.QueryCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 4894b18..6c03780 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -149,6 +149,7 @@ public class QueryServicesOptions {
public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
+ public static final int DEFAULT_STATS_GUIDEPOST_PER_REGION = 0; // Uses guidepost width by default
public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 90c8324..9c85e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -25,11 +25,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ByteUtil;
@@ -74,21 +70,14 @@ public class StatisticsCollector {
public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
Configuration config = env.getConfiguration();
- HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
- int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
- if (guidepostPerRegion > 0) {
- long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
- if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
- maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
- }
- guidepostDepth = maxFileSize / guidepostPerRegion;
- } else {
- guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
- }
+ int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+ long guidepostWidth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+ this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, env.getRegion().getTableDesc());
// Get the stats table associated with the current table on which the CP is
// triggered
- this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
+ this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
}
public long getMaxTimeStamp() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index d8ffd84..2a7047f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -24,6 +24,8 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -110,4 +112,19 @@ public class StatisticsUtil {
}
return PTableStats.EMPTY_STATS;
}
+
+ public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) {
+ if (guidepostPerRegion > 0) {
+ long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+ if (tableDesc != null) {
+ long tableMaxFileSize = tableDesc.getMaxFileSize();
+ if (tableMaxFileSize >= 0) {
+ maxFileSize = tableMaxFileSize;
+ }
+ }
+ return maxFileSize / guidepostPerRegion;
+ } else {
+ return guidepostWidth;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6681042..f70c327 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -26,11 +26,13 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
@@ -43,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TimeKeeper;
import com.google.protobuf.ServiceException;
@@ -63,24 +66,31 @@ public class StatisticsWriter implements Closeable {
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
- public static StatisticsWriter newWriter(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException {
+ public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
}
- StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
+ HTableInterface statsWriterTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
+ HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
+ StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName, clientTimeStamp);
if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
statsTable.commitLastStatsUpdatedTime();
}
return statsTable;
}
- private final HTableInterface statisticsTable;
+ private final HTableInterface statsWriterTable;
+ // In HBase 0.98.4 or above, the reader and writer will be the same.
+ // In pre HBase 0.98.4, there was a bug in using the HTable returned
+ // from a coprocessor for scans, so in that case it'll be different.
+ private final HTableInterface statsReaderTable;
private final byte[] tableName;
private final long clientTimeStamp;
- private StatisticsWriter(HTableInterface statsTable, String tableName, long clientTimeStamp) {
- this.statisticsTable = statsTable;
- this.tableName = PDataType.VARCHAR.toBytes(tableName);
+ private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName, long clientTimeStamp) {
+ this.statsReaderTable = statsReaderTable;
+ this.statsWriterTable = statsWriterTable;
+ this.tableName = Bytes.toBytes(tableName);
this.clientTimeStamp = clientTimeStamp;
}
@@ -89,7 +99,7 @@ public class StatisticsWriter implements Closeable {
*/
@Override
public void close() throws IOException {
- statisticsTable.close();
+ statsWriterTable.close();
}
public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
@@ -100,7 +110,7 @@ public class StatisticsWriter implements Closeable {
}
long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp;
byte[] famBytes = PDataType.VARCHAR.toBytes(fam);
- Result result = StatisticsUtil.readRegionStatistics(statisticsTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
+ Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
if (result != null && !result.isEmpty()) {
Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
@@ -211,7 +221,7 @@ public class StatisticsWriter implements Closeable {
mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
}
MutateRowsRequest mrm = mrmBuilder.build();
- CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
+ CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
try {
@@ -235,7 +245,7 @@ public class StatisticsWriter implements Closeable {
// Always use wallclock time for this, as it's a mechanism to prevent
// stats from being collected too often.
Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
- statisticsTable.put(put);
+ statsWriterTable.put(put);
}
public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index f1e625b..1ca246f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import javax.annotation.Nullable;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -69,6 +70,7 @@ import com.google.common.base.Preconditions;
*/
public class SchemaUtil {
private static final int VAR_LENGTH_ESTIMATE = 10;
+ private static final int VAR_KV_LENGTH_ESTIMATE = 50;
public static final String ESCAPE_CHARACTER = "\"";
public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF;
public static final PDatum VAR_BINARY_DATUM = new PDatum() {
@@ -110,6 +112,28 @@ public class SchemaUtil {
public static boolean isPKColumn(PColumn column) {
return column.getFamilyName() == null;
}
+
+ /**
+ * Imperfect estimate of row size given a PTable
+ * TODO: keep row count in stats table and use total size / row count instead
+ * @param table
+ * @return estimate of size in bytes of a row
+ */
+ public static long estimateRowSize(PTable table) {
+ int keyLength = estimateKeyLength(table);
+ long rowSize = 0;
+ for (PColumn column : table.getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ PDataType type = column.getDataType();
+ Integer maxLength = column.getMaxLength();
+ int valueLength = !type.isFixedWidth() ? VAR_KV_LENGTH_ESTIMATE : maxLength == null ? type.getByteSize() : maxLength;
+ rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, column.getFamilyName().getBytes().length, column.getName().getBytes().length, valueLength);
+ }
+ }
+ // Empty key value
+ rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, QueryConstants.EMPTY_COLUMN_BYTES.length, 0);
+ return rowSize;
+ }
/**
* Estimate the max key length in bytes of the PK for a given table
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 3327dba..7205faa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+@SuppressWarnings("deprecation")
public class ServerUtil {
private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
@@ -133,19 +134,34 @@ public class ServerUtil {
return null;
}
- @SuppressWarnings("deprecation")
- public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, String tableName) throws IOException {
- String versionString = env.getHBaseVersion();
- int version = VersionUtil.encodeVersion(versionString);
- if (version >= COPROCESSOR_SCAN_WORKS) {
- // The following *should* work, but doesn't due to HBASE-11837 which was fixed in 0.98.6
- return env.getTable(TableName.valueOf(tableName));
- }
- // This code works around HBASE-11837
+ private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) {
+ return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS);
+ }
+
+ /*
+ * This code works around HBASE-11837 which causes HTableInterfaces retrieved from
+ * RegionCoprocessorEnvironment to not read local data.
+ */
+ private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnvironment env, byte[] tableName) {
// It's ok to not ever do a pool.close() as we're storing a single
// table only. The HTablePool holds no other resources that this table
// which will be closed itself when it's no longer needed.
+ @SuppressWarnings("resource")
HTablePool pool = new HTablePool(env.getConfiguration(),1);
return pool.getTable(tableName);
}
+
+ public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, HTableInterface writerTable) throws IOException {
+ if (coprocessorScanWorks(env)) {
+ return writerTable;
+ }
+ return getTableFromSingletonPool(env, writerTable.getTableName());
+ }
+
+ public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, byte[] tableName) throws IOException {
+ if (coprocessorScanWorks(env)) {
+ return env.getTable(TableName.valueOf(tableName));
+ }
+ return getTableFromSingletonPool(env, tableName);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index 136a997..9dfcce5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PLongColumn;
import org.apache.phoenix.schema.PName;
@@ -140,6 +141,16 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
@Override
public void explain(List<String> planSteps) {
}
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
};
ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index a135729..02fdcea 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
import java.util.*;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.AssertResults;
@@ -68,6 +69,15 @@ public class ConcatResultIteratorTest {
public void explain(List<String> planSteps) {
}
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
};
Tuple[] expectedResults = new Tuple[] {
@@ -118,6 +128,15 @@ public class ConcatResultIteratorTest {
public void explain(List<String> planSteps) {
}
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
};
ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
AssertResults.assertResults(scanner, expectedResults);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 9ff088e..095027c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
import java.util.*;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.AssertResults;
@@ -73,6 +74,15 @@ public class MergeSortResultIteratorTest {
public void explain(List<String> planSteps) {
}
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
};
ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
AssertResults.assertResults(scanner, expectedResults);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
index 1e3df0b..6139aa5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -25,6 +25,7 @@ import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Test;
@@ -183,5 +184,55 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest {
}
}
}
+
+ @Test
+ public void testTenantSpecificConnWithLimit() throws Exception {
+ String baseTableDDL = "CREATE TABLE BASE_MULTI_TENANT_TABLE(\n " +
+ " tenant_id VARCHAR(5) NOT NULL,\n" +
+ " userid INTEGER NOT NULL,\n" +
+ " username VARCHAR NOT NULL,\n" +
+ " col VARCHAR\n " +
+ " CONSTRAINT pk PRIMARY KEY (tenant_id, userid, username)) MULTI_TENANT=true";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(baseTableDDL);
+ conn.close();
+
+ String tenantId = "tenantId";
+ String tenantViewDDL = "CREATE VIEW TENANT_VIEW AS SELECT * FROM BASE_MULTI_TENANT_TABLE";
+ Properties tenantProps = new Properties();
+ tenantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ conn = DriverManager.getConnection(getUrl(), tenantProps);
+ conn.createStatement().execute(tenantViewDDL);
+
+ String query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT 1";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertEquals("CLIENT SERIAL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" +
+ " SERVER FILTER BY PageFilter 1\n" +
+ " SERVER 1 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT " + Integer.MAX_VALUE;
+ rs = conn.createStatement().executeQuery(query);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" +
+ " SERVER FILTER BY PageFilter " + Integer.MAX_VALUE + "\n" +
+ " SERVER " + Integer.MAX_VALUE + " ROW LIMIT\n" +
+ "CLIENT " + Integer.MAX_VALUE + " ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE username = 'Joe' LIMIT 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" +
+ " SERVER FILTER BY USERNAME = 'Joe'\n" +
+ " SERVER 1 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE col = 'Joe' LIMIT 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" +
+ " SERVER FILTER BY COL = 'Joe'\n" +
+ " SERVER 1 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ }
+
+ @Test
+ public void testLimitOnTenantSpecific() throws Exception {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index 2bff620..f6808a8 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -105,7 +105,7 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
- ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators);
+ ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
}