You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/04 05:50:59 UTC
[2/6] PHOENIX-1251 Salted queries with range scan become full table
scans
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 84ae243..40a0cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -230,9 +230,11 @@ public abstract class ExplainTable {
private void appendScanRow(StringBuilder buf, Bound bound) {
ScanRanges scanRanges = context.getScanRanges();
- KeyRange minMaxRange = context.getMinMaxRange();
+ // TODO: review this and potentially intersect the scan ranges
+ // with the minMaxRange in ScanRanges to prevent having to do all this.
+ KeyRange minMaxRange = scanRanges.getMinMaxRange();
Iterator<byte[]> minMaxIterator = Iterators.emptyIterator();
- if (minMaxRange != null) {
+ if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
RowKeySchema schema = tableRef.getTable().getRowKeySchema();
if (!minMaxRange.isUnbound(bound)) {
minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
@@ -262,8 +264,7 @@ public abstract class ExplainTable {
private void appendKeyRanges(StringBuilder buf) {
ScanRanges scanRanges = context.getScanRanges();
- KeyRange minMaxRange = context.getMinMaxRange();
- if (minMaxRange == null && (scanRanges == ScanRanges.EVERYTHING || scanRanges == ScanRanges.NOTHING)) {
+ if (scanRanges.isDegenerate() || scanRanges.isEverything()) {
return;
}
buf.append(" [");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
deleted file mode 100644
index 0448e46..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.schema.PTable;
-
-
-/**
- * Factory class for the Region Splitter used by the project.
- */
-public class ParallelIteratorRegionSplitterFactory {
-
- public static ParallelIteratorRegionSplitter getSplitter(StatementContext context, PTable table, HintNode hintNode) throws SQLException {
- if (context.getScanRanges().useSkipScanFilter()) {
- return SkipRangeParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
- }
- return DefaultParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index edab575..da8c212 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -34,26 +33,28 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
@@ -62,6 +63,7 @@ import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
@@ -71,6 +73,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -84,7 +87,10 @@ import com.google.common.collect.Lists;
*/
public class ParallelIterators extends ExplainTable implements ResultIterators {
private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
+ private final List<List<Scan>> scans;
private final List<KeyRange> splits;
+ private final PTable physicalTable;
+ private final QueryPlan plan;
private final ParallelIteratorFactory iteratorFactory;
public static interface ParallelIteratorFactory {
@@ -92,6 +98,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+ private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -100,10 +107,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
};
- public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement,
- RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
throws SQLException {
- super(context, tableRef, groupBy);
+ super(plan.getContext(), plan.getTableRef(), plan.getGroupBy());
+ this.plan = plan;
+ StatementContext context = plan.getContext();
+ TableRef tableRef = plan.getTableRef();
+ FilterableStatement statement = plan.getStatement();
+ RowProjector projector = plan.getProjector();
MetaDataClient client = new MetaDataClient(context.getConnection());
PTable physicalTable = tableRef.getTable();
String physicalName = tableRef.getTable().getPhysicalName().getString();
@@ -122,8 +133,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
.getTable(new PTableKey(null, physicalTableName));
}
}
- this.splits = getSplits(context, physicalTable, statement.getHint());
- this.iteratorFactory = iteratorFactory;
+ this.physicalTable = physicalTable;
Scan scan = context.getScan();
PTable table = tableRef.getTable();
if (projector.isProjectEmptyKeyValue()) {
@@ -148,17 +158,30 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
}
} else if (table.getViewType() == ViewType.MAPPED) {
- // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
- // selected column values are returned back to client
- for (PColumnFamily family : table.getColumnFamilies()) {
- scan.addFamily(family.getName().getBytes());
- }
- } // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
- if (limit != null) {
- ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
+ // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+ // selected column values are returned back to client
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
+
+ // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+ if (perScanLimit != null) {
+ ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
}
doColumnProjectionOptimization(context, scan, table, statement);
+
+ this.iteratorFactory = iteratorFactory;
+ this.scans = getParallelScans(context.getScan());
+ List<List<Scan>> scans = getParallelScans(context.getScan());
+ List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+ for (List<Scan> scanList : scans) {
+ for (Scan aScan : scanList) {
+ splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+ }
+ }
+ this.splits = ImmutableList.copyOf(splitRanges);
}
private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
@@ -241,29 +264,210 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
}
+ public List<KeyRange> getSplits() {
+ return splits;
+ }
+
+ private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+ int nBoundaries = regionLocations.size() - 1;
+ List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+ for (int i = 0; i < nBoundaries; i++) {
+ HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+ ranges.add(regionInfo.getEndKey());
+ }
+ return ranges;
+ }
+
+ private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+ int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+ // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+ // in the next region (since we're matching on the end boundary).
+ guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+ return guideIndex;
+ }
+
+ private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+ int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+ // If we found an exact match, return the index we found as the exclusiveKey won't be
+ // contained in the next region as with getIndexContainingInclusive.
+ guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+ return guideIndex;
+ }
+
+ private List<byte[]> getGuidePosts(PTable table) {
+ Scan scan = context.getScan();
+ boolean isPointLookup = context.getScanRanges().isPointLookup();
+ byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
+ List<byte[]> gps = Collections.emptyList();
+ /*
+ * Don't use guide posts if:
+ * 1) We're doing a point lookup, as HBase is fast enough at those
+ * to not need them to be further parallelized. TODO: pref test to verify
+ * 2) We're collecting stats, as in this case we need to scan entire
+ * regions worth of data to track where to put the guide posts.
+ */
+ if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) {
+ if (table.getColumnFamilies().isEmpty()) {
+ // For sure we can get the defaultCF from the table
+ return table.getGuidePosts();
+ }
+ try {
+ if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+ // If default CF is not used in scan, use first CF referenced in scan
+ return table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+ }
+ // Otherwise, favor use of default CF.
+ return table.getColumnFamily(defaultCF).getGuidePosts();
+ } catch (ColumnFamilyNotFoundException cfne) {
+ // Alter table does this
+ }
+ }
+ return gps;
+
+ }
+
+ private static String toString(List<byte[]> gps) {
+ StringBuilder buf = new StringBuilder(gps.size() * 100);
+ buf.append("[");
+ for (int i = 0; i < gps.size(); i++) {
+ buf.append(Bytes.toStringBinary(gps.get(i)));
+ buf.append(",");
+ if (i < gps.size()-1 && (i % 10) == 0) {
+ buf.append("\n");
+ }
+ }
+ buf.setCharAt(buf.length()-1, ']');
+ return buf.toString();
+ }
+
+ private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, boolean crossedRegionBoundary) {
+ if (scan == null) {
+ return scans;
+ }
+ if (!scans.isEmpty()) {
+ boolean startNewScanList = false;
+ if (!plan.isRowKeyOrdered()) {
+ startNewScanList = true;
+ } else if (crossedRegionBoundary) {
+ if (physicalTable.getBucketNum() != null) {
+ byte[] previousStartKey = scans.get(scans.size()-1).getStartRow();
+ byte[] currentStartKey = scan.getStartRow();
+ byte[] prefix = ScanUtil.getPrefix(previousStartKey, SaltingUtil.NUM_SALTING_BYTES);
+ startNewScanList = ScanUtil.crossesPrefixBoundary(currentStartKey, prefix, SaltingUtil.NUM_SALTING_BYTES);
+ }
+ }
+ if (startNewScanList) {
+ parallelScans.add(scans);
+ scans = Lists.newArrayListWithExpectedSize(1);
+ }
+ }
+ scans.add(scan);
+ return scans;
+ }
/**
- * Splits the given scan's key range so that each split can be queried in parallel
- * @param hintNode TODO
- *
- * @return the key ranges that should be scanned in parallel
+ * Compute the list of parallel scans to run for a given query. The inner scans
+ * may be concatenated together directly, while the other ones may need to be
+ * merge sorted, depending on the query.
+ * @return list of parallel scans to run for a given query.
+ * @throws SQLException
*/
- // exposed for tests
- public static List<KeyRange> getSplits(StatementContext context, PTable table, HintNode hintNode) throws SQLException {
- return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
+ private List<List<Scan>> getParallelScans(final Scan scan) throws SQLException {
+ List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+ .getAllTableRegions(physicalTable.getPhysicalName().getBytes());
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ ScanRanges scanRanges = context.getScanRanges();
+ boolean isSalted = physicalTable.getBucketNum() != null;
+ List<byte[]> gps = getGuidePosts(physicalTable);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Guideposts: " + toString(gps));
+ }
+ boolean traverseAllRegions = isSalted;
+
+ byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ int regionIndex = 0;
+ int stopIndex = regionBoundaries.size();
+ if (!traverseAllRegions) {
+ startKey = scan.getStartRow();
+ if (startKey.length > 0) {
+ currentKey = startKey;
+ regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+ }
+ stopKey = scan.getStopRow();
+ if (stopKey.length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+ }
+ }
+ List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+
+ int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+ int gpsSize = gps.size();
+ int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+ int keyOffset = 0;
+ List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+ // Merge bisect with guideposts for all but the last region
+ while (regionIndex <= stopIndex) {
+ byte[] currentGuidePost;
+ byte[] endKey = regionIndex == stopIndex ? stopKey : regionBoundaries.get(regionIndex);
+ while (guideIndex < gpsSize
+ && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+ Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset);
+ scans = addNewScan(parallelScans, scans, newScan, false);
+ currentKey = currentGuidePost;
+ guideIndex++;
+ }
+ Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset);
+ scans = addNewScan(parallelScans, scans, newScan, true);
+ currentKey = endKey;
+ regionIndex++;
+ }
+ if (!scans.isEmpty()) { // Add any remaining scans
+ parallelScans.add(scans);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("The parallelScans: " + parallelScans);
+ }
+ return parallelScans;
}
- private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
- List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size());
- for (HRegionLocation region : regions) {
- keyRanges.add(TO_KEY_RANGE.apply(region));
+ private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
+ if (!concatIterators.isEmpty()) {
+ if (concatIterators.size() == 1) {
+ iterators.add(concatIterators.get(0));
+ } else {
+ // TODO: should ConcatResultIterator have a constructor that takes
+ // a List<PeekingResultIterator>?
+ iterators.add(new ConcatResultIterator(new ResultIterators() {
+
+ @Override
+ public List<PeekingResultIterator> getIterators() throws SQLException {
+ return concatIterators;
+ }
+
+ @Override
+ public int size() {
+ return concatIterators.size();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ // TODO: review what we should for explain plan here
+ concatIterators.get(0).explain(planSteps);
+ }
+
+ }));
+ }
}
- return keyRanges;
}
- public List<KeyRange> getSplits() {
- return splits;
+ public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+ if (!reverse) {
+ return list;
+ }
+ return Lists.reverse(list);
}
-
+
/**
* Executes the scan in parallel across all regions, blocking until all scans are complete.
* @return the result iterators for the scan of each region
@@ -271,53 +475,54 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
@Override
public List<PeekingResultIterator> getIterators() throws SQLException {
boolean success = false;
+ boolean isReverse = ScanUtil.isReversed(context.getScan());
final ConnectionQueryServices services = context.getConnection().getQueryServices();
ReadOnlyProps props = services.getProps();
- int numSplits = splits.size();
+ int numSplits = size();
List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
- List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+ // TODO: what purpose does this scanID serve?
final UUID scanId = UUID.randomUUID();
try {
- submitWork(scanId, splits, futures);
+ submitWork(scanId, scans, futures, splits.size());
int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
- final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1;
- // Sort futures by row key so that we have a predictable order we're getting rows back for scans.
- // We're going to wait here until they're finished anyway and this makes testing much easier.
- Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
- @Override
- public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) {
- return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
- }
- });
boolean clearedCache = false;
- byte[] tableName = tableRef.getTable().getPhysicalName().getBytes();
- for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
- try {
- PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
- iterators.add(iterator);
- } catch (ExecutionException e) {
- try { // Rethrow as SQLException
- throw ServerUtil.parseServerException(e);
- } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
- List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
- if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
- services.clearTableRegionCache(tableName);
- clearedCache = true;
- }
- List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName));
- // Intersect what was the expected boundary with all new region boundaries and
- // resubmit just this portion of work again
- List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
- submitWork(scanId, newSubSplits, newFutures);
- for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) {
- // Immediate do a get (not catching exception again) and then add the iterators we
- // get back immediately. They'll be sorted as expected, since they're replacing the
- // original one.
- PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
- iterators.add(iterator);
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+ List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+ for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+ try {
+ PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ concatIterators.add(iterator);
+ } catch (ExecutionException e) {
+ try { // Rethrow as SQLException
+ throw ServerUtil.parseServerException(e);
+ } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+ if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+ services.clearTableRegionCache(physicalTable.getName().getBytes());
+ clearedCache = true;
+ }
+ // Resubmit just this portion of work again
+ Scan oldScan = scanPair.getFirst();
+ List<List<Scan>> newNestedScans = this.getParallelScans(oldScan);
+ // Add any concatIterators that were successful so far
+ // as we need these to be in order
+ addConcatResultIterator(iterators, concatIterators);
+ concatIterators = Collections.emptyList();
+ submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+ for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+ // Immediate do a get (not catching exception again) and then add the iterators we
+ // get back immediately. They'll be sorted as expected, since they're replacing the
+ // original one.
+ PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ iterators.add(iterator);
+ }
+ }
}
}
}
+ addConcatResultIterator(iterators, concatIterators);
}
success = true;
@@ -337,64 +542,80 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
}
- private void submitWork(final UUID scanId, List<KeyRange> splits,
- List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+ private static final class ScanLocation {
+ private final int outerListIndex;
+ private final int innerListIndex;
+ private final Scan scan;
+ public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
+ this.outerListIndex = outerListIndex;
+ this.innerListIndex = innerListIndex;
+ this.scan = scan;
+ }
+ public int getOuterListIndex() {
+ return outerListIndex;
+ }
+ public int getInnerListIndex() {
+ return innerListIndex;
+ }
+ public Scan getScan() {
+ return scan;
+ }
+ }
+ private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
final ConnectionQueryServices services = context.getConnection().getQueryServices();
ExecutorService executor = services.getExecutor();
- for (final KeyRange split : splits) {
- final Scan splitScan = ScanUtil.newScan(context.getScan());
- // Intersect with existing start/stop key if the table is salted
- // If not salted, we've already intersected it. If salted, we need
- // to wait until now to intersect, as we're running parallel scans
- // on all the possible regions here.
- if (tableRef.getTable().getBucketNum() != null) {
- KeyRange minMaxRange = context.getMinMaxRange();
- if (minMaxRange != null) {
- // Add salt byte based on current split, as minMaxRange won't have it
- minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
- // FIXME: seems like this should be possible when we set the scan start/stop
- // in StatementContext.setScanRanges(). If it doesn't intersect the range for
- // one salt byte, I don't see how it could intersect it with any of them.
- if (!ScanUtil.intersectScanRange(splitScan, minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) {
- continue; // Skip this chunk if no intersection based on minMaxRange
- }
- }
+ // Pre-populate nestedFutures lists so that we can shuffle the scans
+ // and add the future to the right nested list. By shuffling the scans
+ // we get better utilization of the cluster since our thread executor
+ // will spray the scans across machines as opposed to targeting a
+ // single one since the scans are in row key order.
+ List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+ for (int i = 0; i < nestedScans.size(); i++) {
+ List<Scan> scans = nestedScans.get(i);
+ List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
+ nestedFutures.add(futures);
+ for (int j = 0; j < scans.size(); j++) {
+ Scan scan = nestedScans.get(i).get(j);
+ scanLocations.add(new ScanLocation(scan, i, j));
+ futures.add(null); // placeholder
}
- if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
- // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
- ScanUtil.swapStartStopRowIfReversed(splitScan);
- Future<PeekingResultIterator> future =
- executor.submit(new JobCallable<PeekingResultIterator>() {
+ }
+ Collections.shuffle(scanLocations);
+ for (ScanLocation scanLocation : scanLocations) {
+ final Scan scan = scanLocation.getScan();
+ Future<PeekingResultIterator> future =
+ executor.submit(new JobCallable<PeekingResultIterator>() {
- @Override
- public PeekingResultIterator call() throws Exception {
- long startTime = System.currentTimeMillis();
- ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan);
- if (logger.isDebugEnabled()) {
- logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
- }
- return iteratorFactory.newIterator(context, scanner, splitScan);
+ @Override
+ public PeekingResultIterator call() throws Exception {
+ long startTime = System.currentTimeMillis();
+ ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan);
}
+ return iteratorFactory.newIterator(context, scanner, scan);
+ }
- /**
- * Defines the grouping for round robin behavior. All threads spawned to process
- * this scan will be grouped together and time sliced with other simultaneously
- * executing parallel scans.
- */
- @Override
- public Object getJobId() {
- return ParallelIterators.this;
- }
- });
- futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
- }
+ /**
+ * Defines the grouping for round robin behavior. All threads spawned to process
+ * this scan will be grouped together and time sliced with other simultaneously
+ * executing parallel scans.
+ */
+ @Override
+ public Object getJobId() {
+ return ParallelIterators.this;
+ }
+ });
+ // Add our future in the right place so that we can concatenate the
+ // results of the inner futures versus merge sorting across all of them.
+ nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future));
}
-
}
@Override
public int size() {
- return this.splits.size();
+ return this.scans.size();
}
@Override
@@ -403,4 +624,9 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
buf.append("CLIENT PARALLEL " + size() + "-WAY ");
explain(buf.toString(),planSteps);
}
+
+ @Override
+ public String toString() {
+ return "ParallelIterators [scans=" + scans + "]";
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
deleted file mode 100644
index 81f5af6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SaltingUtil;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-
-/**
- * Split the region according to the information contained in the scan's SkipScanFilter.
- */
-public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter {
-
- public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) {
- return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode);
- }
-
- protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) {
- super(context, table, hintNode);
- }
-
- @Override
- protected List<HRegionLocation> getAllRegions() throws SQLException {
- List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
- return filterRegions(allTableRegions, context.getScanRanges());
- }
-
- public List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, final ScanRanges ranges) {
- Iterable<HRegionLocation> regions;
- if (ranges == ScanRanges.EVERYTHING) {
- return allTableRegions;
- } else if (ranges == ScanRanges.NOTHING) { // TODO: why not emptyList?
- return Lists.<HRegionLocation>newArrayList();
- } else {
- regions = Iterables.filter(allTableRegions,
- new Predicate<HRegionLocation>() {
- @Override
- public boolean apply(HRegionLocation region) {
- KeyRange minMaxRange = context.getMinMaxRange();
- if (minMaxRange != null) {
- KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
- if (table.getBucketNum() != null) {
- // Add salt byte, as minMaxRange won't have it
- minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange);
- }
- range = range.intersect(minMaxRange);
- return ranges.intersect(range.getLowerRange(), range.getUpperRange());
- }
- return ranges.intersect(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
- }
- });
- }
- return Lists.newArrayList(regions);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 2840dca..e396b22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -426,6 +426,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public boolean isDegenerate() {
return false;
}
+
+ @Override
+ public boolean isRowKeyOrdered() {
+ return true;
+ }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
index 68f786a..afcc741 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
@@ -210,6 +210,10 @@ public class KeyRange implements Writable {
return compareLowerToUpperBound(b,o,l,true);
}
+ public int compareLowerToUpperBound( byte[] b) {
+ return compareLowerToUpperBound(b,0,b.length);
+ }
+
/**
* Compares a lower bound against an upper bound
* @param b upper bound byte array
@@ -237,6 +241,10 @@ public class KeyRange implements Writable {
return 1;
}
+ public int compareUpperToLowerBound(byte[] b) {
+ return compareUpperToLowerBound(b,0,b.length);
+ }
+
public int compareUpperToLowerBound(byte[] b, int o, int l) {
return compareUpperToLowerBound(b,o,l, true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
deleted file mode 100644
index df55fb5..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.sql.SQLException;
-
-import org.apache.phoenix.schema.TableRef;
-
-
-/**
- *
- * Interface for managing and caching table statistics.
- * The frequency of updating the table statistics are controlled
- * by {@link org.apache.phoenix.query.QueryServices#STATS_UPDATE_FREQ_MS_ATTRIB}.
- * Table stats may also be manually updated through {@link #updateStats(TableRef)}.
- *
- *
- *
- * @since 0.1
- */
-public interface StatsManager {
- /**
- * Get the minimum key for the given table
- * @param table the table
- * @return the minimum key or null if unknown
- */
- byte[] getMinKey(TableRef table);
-
- /**
- * Get the maximum key for the given table
- * @param table the table
- * @return the maximum key or null if unknown
- */
- byte[] getMaxKey(TableRef table);
-
- /**
- * Manually update the cached table statistics
- * @param table the table
- * @throws SQLException
- */
- void updateStats(TableRef table) throws SQLException;
-
- void clearStats() throws SQLException;
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
deleted file mode 100644
index 1ab7df5..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TimeKeeper;
-
-
-/**
- *
- * Implementation of StatsManager. Table stats are updated asynchronously when they're
- * accessed and past time-to-live. In this case, future calls (after the asynchronous
- * call has completed), will have the updated stats.
- *
- * All tables share the same HBase connection for a given connection and each connection
- * will have it's own cache for these stats. This isn't ideal and will get reworked when
- * the schema is kept on the server side. It's ok for now because:
- * 1) we only ask the server for these stats when the start/end region is queried against
- * 2) the query to get the stats pulls a single row so it's very cheap
- * 3) it's async and if it takes too long it won't lead to anything except less optimal
- * parallelization.
- *
- *
- * @since 0.1
- */
-public class StatsManagerImpl implements StatsManager {
- private final ConnectionQueryServices services;
- private final int statsUpdateFrequencyMs;
- private final int maxStatsAgeMs;
- private final TimeKeeper timeKeeper;
- private final ConcurrentMap<String,PTableStats> tableStatsMap = new ConcurrentHashMap<String,PTableStats>();
-
- public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs) {
- this(services, statsUpdateFrequencyMs, maxStatsAgeMs, TimeKeeper.SYSTEM);
- }
-
- public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs, TimeKeeper timeKeeper) {
- this.services = services;
- this.statsUpdateFrequencyMs = statsUpdateFrequencyMs;
- this.maxStatsAgeMs = maxStatsAgeMs;
- this.timeKeeper = timeKeeper;
- }
-
- public long getStatsUpdateFrequency() {
- return statsUpdateFrequencyMs;
- }
-
- @Override
- public void updateStats(TableRef tableRef) throws SQLException {
- SQLException sqlE = null;
- HTableInterface hTable = services.getTable(tableRef.getTable().getPhysicalName().getBytes());
- try {
- byte[] minKey = null, maxKey = null;
- // Do a key-only scan to get the first row of a table. This is the min
- // key for the table.
- Scan scan = new Scan(HConstants.EMPTY_START_ROW, new KeyOnlyFilter());
- ResultScanner scanner = hTable.getScanner(scan);
- try {
- Result r = scanner.next();
- if (r != null) {
- minKey = r.getRow();
- }
- } finally {
- scanner.close();
- }
- int maxPossibleKeyLength = SchemaUtil.estimateKeyLength(tableRef.getTable());
- byte[] maxPossibleKey = new byte[maxPossibleKeyLength];
- Arrays.fill(maxPossibleKey, (byte)255);
- // Use this deprecated method to get the key "before" the max possible key value,
- // which is the max key for a table.
- @SuppressWarnings("deprecation")
- Result r = hTable.getRowOrBefore(maxPossibleKey, tableRef.getTable().getColumnFamilies().iterator().next().getName().getBytes());
- if (r != null) {
- maxKey = r.getRow();
- }
- tableStatsMap.put(tableRef.getTable().getName().getString(), new PTableStats(timeKeeper.getCurrentTime(),minKey,maxKey));
- } catch (IOException e) {
- sqlE = ServerUtil.parseServerException(e);
- } finally {
- try {
- hTable.close();
- } catch (IOException e) {
- if (sqlE == null) {
- sqlE = ServerUtil.parseServerException(e);
- } else {
- sqlE.setNextException(ServerUtil.parseServerException(e));
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
- }
- }
- }
-
- private PTableStats getStats(final TableRef table) {
- PTableStats stats = tableStatsMap.get(table);
- if (stats == null) {
- PTableStats newStats = new PTableStats();
- stats = tableStatsMap.putIfAbsent(table.getTable().getName().getString(), newStats);
- stats = stats == null ? newStats : stats;
- }
- // Synchronize on the current stats for a table to prevent
- // multiple attempts to update the stats.
- synchronized (stats) {
- long initiatedTime = stats.getInitiatedTime();
- long currentTime = timeKeeper.getCurrentTime();
- // Update stats asynchronously if they haven't been updated within the specified frequency.
- // We update asynchronously because we don't ever want to block the caller - instead we'll continue
- // to use the old one.
- if ( currentTime - initiatedTime >= getStatsUpdateFrequency()) {
- stats.setInitiatedTime(currentTime);
- services.getExecutor().submit(new Callable<Void>() {
-
- @Override
- public Void call() throws Exception { // TODO: will exceptions be logged?
- updateStats(table);
- return null;
- }
-
- });
- }
- // If the stats are older than the max age, use an empty stats
- if (currentTime - stats.getCompletedTime() >= maxStatsAgeMs) {
- return PTableStats.NO_STATS;
- }
- }
- return stats;
- }
-
- @Override
- public byte[] getMinKey(TableRef table) {
- PTableStats stats = getStats(table);
- return stats.getMinKey();
- }
-
- @Override
- public byte[] getMaxKey(TableRef table) {
- PTableStats stats = getStats(table);
- return stats.getMaxKey();
- }
-
- private static class PTableStats {
- private static final PTableStats NO_STATS = new PTableStats();
- private long initiatedTime;
- private final long completedTime;
- private final byte[] minKey;
- private final byte[] maxKey;
-
- public PTableStats() {
- this(-1,null,null);
- }
- public PTableStats(long completedTime, byte[] minKey, byte[] maxKey) {
- this.minKey = minKey;
- this.maxKey = maxKey;
- this.completedTime = this.initiatedTime = completedTime;
- }
-
- private byte[] getMinKey() {
- return minKey;
- }
-
- private byte[] getMaxKey() {
- return maxKey;
- }
-
- private long getCompletedTime() {
- return completedTime;
- }
-
- private void setInitiatedTime(long initiatedTime) {
- this.initiatedTime = initiatedTime;
- }
-
- private long getInitiatedTime() {
- return initiatedTime;
- }
- }
-
- @Override
- public void clearStats() throws SQLException {
- tableStatsMap.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index f9347a4..536e5bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
@@ -54,8 +55,7 @@ public class StatisticsCollector {
private Map<String, byte[]> minMap = Maps.newHashMap();
private Map<String, byte[]> maxMap = Maps.newHashMap();
private long guidepostDepth;
- private long byteCount = 0;
- private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+ private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap();
private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
protected StatisticsTable statsTable;
private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
@@ -131,7 +131,6 @@ public class StatisticsCollector {
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMore = true;
while (hasMore) {
- // Am getting duplicates here. Need to avoid that
hasMore = scanner.next(results);
collectStatistics(results);
count += results.size();
@@ -280,18 +279,21 @@ public class StatisticsCollector {
maxMap.put(fam, row);
}
}
- byteCount += kv.getLength();
// TODO : This can be moved to an interface so that we could collect guide posts in different ways
+ Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+ if (gps == null) {
+ gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList());
+ guidePostsMap.put(fam, gps);
+ }
+ int byteCount = gps.getFirst() + kv.getLength();
+ gps.setFirst(byteCount);
if (byteCount >= guidepostDepth) {
- if (guidePostsMap.get(fam) != null) {
- guidePostsMap.get(fam).add(row);
- } else {
- List<byte[]> guidePosts = new ArrayList<byte[]>();
- guidePosts.add(row);
- guidePostsMap.put(fam, guidePosts);
+ // Prevent dups
+ List<byte[]> gpsKeys = gps.getSecond();
+ if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) {
+ gpsKeys.add(row);
+ gps.setFirst(0); // Only reset count when adding guidepost
}
- // reset the count for the next key
- byteCount = 0;
}
}
@@ -307,16 +309,19 @@ public class StatisticsCollector {
public byte[] getGuidePosts(String fam) {
if (!guidePostsMap.isEmpty()) {
- List<byte[]> guidePosts = guidePostsMap.get(fam);
- if (guidePosts != null) {
- byte[][] array = new byte[guidePosts.size()][];
- int i = 0;
- for (byte[] element : guidePosts) {
- array[i] = element;
- i++;
+ Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+ if (gps != null) {
+ List<byte[]> guidePosts = gps.getSecond();
+ if (!guidePosts.isEmpty()) {
+ byte[][] array = new byte[guidePosts.size()][];
+ int i = 0;
+ for (byte[] element : guidePosts) {
+ array[i] = element;
+ i++;
+ }
+ PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
+ return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
}
- PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
- return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
}
}
return null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index 2ea8a13..5561002 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ByteUtil;
/**
* Wrapper to access the statistics table SYSTEM.STATS using the HTable.
@@ -56,14 +57,12 @@ public class StatisticsTable implements Closeable {
if (table == null) {
// Map the statics table and the table with which the statistics is
// associated. This is a workaround
- HTablePool pool = new HTablePool(conf, 1);
- try {
- HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
- table = new StatisticsTable(hTable);
- tableMap.put(primaryTableName, table);
- } finally {
- pool.close();
- }
+ HTablePool pool = new HTablePool(conf,100);
+ //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+ HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+ //h.setAutoFlushTo(true);
+ table = new StatisticsTable(hTable);
+ tableMap.put(primaryTableName, table);
}
return table;
}
@@ -132,6 +131,9 @@ public class StatisticsTable implements Closeable {
currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+ // Add our empty column value so queries behave correctly
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
+ currentTime, ByteUtil.EMPTY_BYTE_ARRAY);
mutations.add(put);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 0d69c8b..c0da0bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -41,7 +41,6 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.RowKeySchema;
import com.google.common.collect.Lists;
@@ -55,8 +54,8 @@ import com.google.common.collect.Lists;
* @since 0.1
*/
public class ScanUtil {
-
public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+ private static final byte[] ZERO_BYTE_ARRAY = new byte[1024];
private ScanUtil() {
}
@@ -210,7 +209,7 @@ public class ScanUtil {
private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] slotSpan, Bound bound) {
if (slots.isEmpty()) {
- return null;
+ return KeyRange.UNBOUND;
}
int[] position = new int[slots.size()];
int maxLength = 0;
@@ -222,7 +221,7 @@ public class ScanUtil {
byte[] key = new byte[maxLength];
int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 0, position.length);
if (length == 0) {
- return null;
+ return KeyRange.UNBOUND;
}
if (length == maxLength) {
return key;
@@ -385,9 +384,35 @@ public class ScanUtil {
return keyRanges;
}
- public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) {
+ /**
+ * Converts a partially qualified KeyRange into a KeyRange with a
+ * inclusive lower bound and an exclusive upper bound, widening
+ * as necessary.
+ */
+ public static KeyRange convertToInclusiveExclusiveRange (KeyRange partialRange, RowKeySchema schema, ImmutableBytesWritable ptr) {
+ // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
+ // what we need to intersect against for the HBase scan.
+ byte[] lowerRange = partialRange.getLowerRange();
+ if (!partialRange.lowerUnbound()) {
+ if (!partialRange.isLowerInclusive()) {
+ lowerRange = ScanUtil.nextKey(lowerRange, schema, ptr);
+ }
+ }
+
+ byte[] upperRange = partialRange.getUpperRange();
+ if (!partialRange.upperUnbound()) {
+ if (partialRange.isUpperInclusive()) {
+ upperRange = ScanUtil.nextKey(upperRange, schema, ptr);
+ }
+ }
+ if (partialRange.getLowerRange() != lowerRange || partialRange.getUpperRange() != upperRange) {
+ partialRange = KeyRange.getKeyRange(lowerRange, upperRange);
+ }
+ return partialRange;
+ }
+
+ private static byte[] nextKey(byte[] key, RowKeySchema schema, ImmutableBytesWritable ptr) {
int pos = 0;
- RowKeySchema schema = table.getRowKeySchema();
int maxOffset = schema.iterator(key, ptr);
while (schema.next(ptr, pos, maxOffset) != null) {
pos++;
@@ -425,7 +450,7 @@ public class ScanUtil {
byte[] reversed = scan.getAttribute(REVERSED_ATTR);
return (PDataType.TRUE_BYTES.equals(reversed));
}
-
+
public static int[] getDefaultSlotSpans(int nSlots) {
return new int[nSlots];
}
@@ -467,4 +492,31 @@ public class ScanUtil {
public static boolean isAnalyzeTable(Scan scan) {
return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
}
+
+ public static boolean crossesPrefixBoundary(byte[] key, byte[] prefixBytes, int prefixLength) {
+ if (key.length < prefixLength) {
+ return true;
+ }
+ if (prefixBytes.length >= prefixLength) {
+ return Bytes.compareTo(prefixBytes, 0, prefixLength, key, 0, prefixLength) != 0;
+ }
+ return hasNonZeroLeadingBytes(key, prefixLength);
+ }
+
+ public static byte[] getPrefix(byte[] startKey, int prefixLength) {
+ // If startKey is at beginning, then our prefix will be a null padded byte array
+ return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) {
+ if (nBytesToCheck > ZERO_BYTE_ARRAY.length) {
+ do {
+ if (Bytes.compareTo(key, nBytesToCheck - ZERO_BYTE_ARRAY.length, ZERO_BYTE_ARRAY.length, ScanUtil.ZERO_BYTE_ARRAY, 0, ScanUtil.ZERO_BYTE_ARRAY.length) != 0) {
+ return true;
+ }
+ nBytesToCheck -= ZERO_BYTE_ARRAY.length;
+ } while (nBytesToCheck > ZERO_BYTE_ARRAY.length);
+ }
+ return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, nBytesToCheck) != 0;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
new file mode 100644
index 0000000..be90399
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ScanUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ScanRangesIntersectTest {
+
+ @Test
+ public void testPointLookupIntersect() throws Exception {
+ RowKeySchema schema = schema();
+ int[] slotSpan = ScanUtil.SINGLE_COLUMN_SLOT_SPAN;
+ List<KeyRange> keys = points("a","j","m","z");
+ ScanRanges ranges = ScanRanges.create(schema, Collections.singletonList(keys), slotSpan);
+ assertIntersect(ranges, "b", "l", "j");
+
+ }
+
+ private static void assertIntersect(ScanRanges ranges, String lowerRange, String upperRange, String... expectedPoints) {
+ List<KeyRange> expectedKeys = points(expectedPoints);
+ Collections.sort(expectedKeys,KeyRange.COMPARATOR);
+ Scan scan = new Scan();
+ scan.setFilter(ranges.getSkipScanFilter());
+ byte[] startKey = lowerRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(lowerRange);
+ byte[] stopKey = upperRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(upperRange);
+ Scan newScan = ranges.intersectScan(scan, startKey, stopKey, 0);
+ if (expectedPoints.length == 0) {
+ assertNull(newScan);
+ } else {
+ assertNotNull(newScan);
+ SkipScanFilter filter = (SkipScanFilter)newScan.getFilter();
+ assertEquals(expectedKeys, filter.getSlots().get(0));
+ }
+ }
+
+ private static List<KeyRange> points(String... points) {
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(points.length);
+ for (String point : points) {
+ keys.add(KeyRange.getKeyRange(PDataType.VARCHAR.toBytes(point)));
+ }
+ return keys;
+ }
+
+ private static RowKeySchema schema() {
+ RowKeySchemaBuilder builder = new RowKeySchemaBuilder(1);
+ builder.addField(new PDatum() {
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+ @Override
+ public PDataType getDataType() {
+ return PDataType.VARCHAR;
+ }
+ @Override
+ public Integer getMaxLength() {
+ return null;
+ }
+ @Override
+ public Integer getScale() {
+ return null;
+ }
+ @Override
+ public SortOrder getSortOrder() {
+ return SortOrder.getDefault();
+ }
+ }, false, SortOrder.getDefault());
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
index cd88ce7..695c4c9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
@@ -72,7 +72,7 @@ public class ScanRangesTest {
// incrementing the key too much.
upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
}
- assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey));
+ assertEquals(expectedResult, scanRanges.intersects(lowerInclusiveKey,upperExclusiveKey,0));
}
@Parameters(name="{0} {2}")
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 3c0a952..063728c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -165,17 +165,22 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
KeyRange.getKeyRange(startKey2)));
if (Bytes.compareTo(startKey1, startKey2) > 0) {
expectedStartKey = startKey2;
- expectedEndKey = ByteUtil.concat(startKey1, QueryConstants.SEPARATOR_BYTE_ARRAY);
+ expectedEndKey = startKey1;
Collections.reverse(expectedRanges.get(0));
} else {
expectedStartKey = startKey1;
- expectedEndKey = ByteUtil.concat(startKey2, QueryConstants.SEPARATOR_BYTE_ARRAY);;
+ expectedEndKey = startKey2;
}
- assertTrue(Bytes.compareTo(expectedStartKey, startKey) == 0);
- assertTrue(Bytes.compareTo(expectedEndKey, stopKey) == 0);
+ assertEquals(0,startKey.length);
+ assertEquals(0,stopKey.length);
assertNotNull(filter);
assertTrue(filter instanceof SkipScanFilter);
+ SkipScanFilter skipScanFilter = (SkipScanFilter)filter;
+ assertEquals(1,skipScanFilter.getSlots().size());
+ assertEquals(2,skipScanFilter.getSlots().get(0).size());
+ assertArrayEquals(expectedStartKey, skipScanFilter.getSlots().get(0).get(0).getLowerRange());
+ assertArrayEquals(expectedEndKey, skipScanFilter.getSlots().get(0).get(1).getLowerRange());
StatementContext context = plan.getContext();
ScanRanges scanRanges = context.getScanRanges();
List<List<KeyRange>> ranges = scanRanges.getRanges();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index bd19663..032768b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -1185,9 +1185,8 @@ public class WhereOptimizerTest extends BaseConnectionlessQueryTest {
StatementContext context = compileStatement(query, binds);
Scan scan = context.getScan();
Filter filter = scan.getFilter();
- assertNotNull(filter);
- assertTrue(filter instanceof SkipScanFilter);
- byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId));
+ assertNull(filter);
+ byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2));
byte[] expectedStopRow = ByteUtil.concat(ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2)), QueryConstants.SEPARATOR_BYTE_ARRAY);
assertArrayEquals(expectedStartRow, scan.getStartRow());
assertArrayEquals(expectedStopRow, scan.getStopRow());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index ff31f7c..8ac322f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
import static org.apache.phoenix.util.TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME;
import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -103,6 +104,7 @@ public class BaseConnectionlessQueryTest extends BaseTest {
ensureTableCreated(getUrl(), ENTITY_HISTORY_TABLE_NAME);
ensureTableCreated(getUrl(), FUNKY_NAME);
ensureTableCreated(getUrl(), PTSDB_NAME);
+ ensureTableCreated(getUrl(), PTSDB3_NAME);
ensureTableCreated(getUrl(), MULTI_CF_NAME);
ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
@@ -110,7 +112,6 @@ public class BaseConnectionlessQueryTest extends BaseTest {
ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
ensureTableCreated(getUrl(), TABLE_WITH_ARRAY);
Properties props = new Properties();
- //props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_TABLE_TIMESTAMP));
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class);
try {