You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 02:17:08 UTC
[3/7] phoenix git commit: PHOENIX-1432 Run limit query that has only
leading PK column filter serially
PHOENIX-1432 Run limit query that has only leading PK column filter serially
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e77cded0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e77cded0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e77cded0
Branch: refs/heads/4.2
Commit: e77cded06a8b137a3c99b5cbc6982058119437c9
Parents: 05d23cb
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 10:02:20 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 17:03:57 2014 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/HashJoinIT.java | 6 +-
.../org/apache/phoenix/end2end/KeyOnlyIT.java | 49 +-
.../phoenix/end2end/StatsCollectorIT.java | 16 +-
.../MutatingParallelIteratorFactory.java | 2 +-
.../apache/phoenix/compile/QueryCompiler.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 4 +-
.../apache/phoenix/execute/AggregatePlan.java | 2 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 2 +-
.../org/apache/phoenix/execute/ScanPlan.java | 47 +-
.../phoenix/iterate/BaseResultIterators.java | 622 +++++++++++++++++++
.../phoenix/iterate/ChunkedResultIterator.java | 10 +-
.../phoenix/iterate/ConcatResultIterator.java | 37 +-
.../phoenix/iterate/DelegateResultIterator.java | 4 +
.../iterate/LimitingPeekingResultIterator.java | 47 ++
.../iterate/ParallelIteratorFactory.java | 27 +
.../phoenix/iterate/ParallelIterators.java | 580 +----------------
.../apache/phoenix/iterate/ResultIterators.java | 7 +-
.../apache/phoenix/iterate/SerialIterators.java | 115 ++++
.../phoenix/iterate/SpoolingResultIterator.java | 1 -
.../apache/phoenix/optimize/QueryOptimizer.java | 2 +-
.../phoenix/query/QueryServicesOptions.java | 1 +
.../schema/stats/StatisticsCollector.java | 23 +-
.../phoenix/schema/stats/StatisticsUtil.java | 17 +
.../phoenix/schema/stats/StatisticsWriter.java | 30 +-
.../org/apache/phoenix/util/SchemaUtil.java | 24 +
.../org/apache/phoenix/util/ServerUtil.java | 34 +-
.../iterate/AggregateResultScannerTest.java | 11 +
.../iterate/ConcatResultIteratorTest.java | 21 +-
.../iterate/MergeSortResultIteratorTest.java | 12 +-
.../org/apache/phoenix/query/QueryPlanTest.java | 51 ++
.../phoenix/pig/hadoop/PhoenixRecordReader.java | 2 +-
31 files changed, 1145 insertions(+), 663 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 3850ac9..304a062 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -395,7 +395,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
* LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
* LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
*/
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
" SERVER FILTER BY PageFilter 4\n" +
" SERVER 4 ROW LIMIT\n" +
"CLIENT 4 ROW LIMIT\n" +
@@ -745,7 +745,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
* LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
* LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
*/
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
" SERVER FILTER BY PageFilter 4\n" +
" SERVER 4 ROW LIMIT\n" +
"CLIENT 4 ROW LIMIT\n" +
@@ -1116,7 +1116,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
* LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
* LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
*/
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
" SERVER FILTER BY PageFilter 4\n" +
" SERVER 4 ROW LIMIT\n" +
"CLIENT 4 ROW LIMIT\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index b7e3314..42ee5ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -37,7 +37,9 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -51,7 +53,8 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
// Must update config before starting server
- props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(100));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -79,7 +82,7 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
assertEquals(4, rs.getInt(2));
assertFalse(rs.next());
List<KeyRange> splits = getAllSplits(conn5, "KEYONLY");
- assertEquals(3, splits.size());
+ assertEquals(2, splits.size());
conn5.close();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+60));
@@ -159,6 +162,31 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
conn5.close();
}
+ @Test
+ public void testQueryWithLimitAndStats() throws Exception {
+ long ts = nextTimestamp();
+ ensureTableCreated(getUrl(),KEYONLY_NAME,null, ts);
+ initTableValues(ts+1, 100);
+
+ TestUtil.analyzeTable(getUrl(), ts+10, KEYONLY_NAME);
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" +
+ " SERVER FILTER BY PageFilter 1\n" +
+ " SERVER 1 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ conn.close();
+ }
+
protected static void initTableValues(long ts) throws Exception {
String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -177,4 +205,21 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
conn.commit();
conn.close();
}
+
+ protected static void initTableValues(long ts, int nRows) throws Exception {
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(url, props);
+ PreparedStatement stmt = conn.prepareStatement(
+ "upsert into " +
+ "KEYONLY VALUES (?, ?)");
+ for (int i = 0; i < nRows; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i+1);
+ stmt.execute();
+ }
+
+ conn.commit();
+ conn.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index faa54ea..cc16911 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Array;
@@ -169,7 +170,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "b");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -180,7 +180,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "c");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -191,7 +190,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "d");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -202,7 +200,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "b");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -213,7 +210,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "e");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -224,14 +220,9 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setArray(3, array);
stmt.execute();
conn.commit();
- flush(tableName);
return conn;
}
- private void flush(String tableName) throws IOException, InterruptedException {
- //utility.getHBaseAdmin().flush(tableName.toUpperCase());
- }
-
private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt;
stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
@@ -298,9 +289,12 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
nRegions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size();
nTries++;
} while (nRegions == nRegionsNow && nTries < 10);
+ if (nRegions == nRegionsNow) {
+ fail();
+ }
// FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp,
// yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one.
- Thread.sleep(3000);
+ Thread.sleep(8000);
} finally {
admin.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 6388b1a..ba601ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 4ca0bb5..108492a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -42,7 +42,7 @@ import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7604663..335f4b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -606,7 +606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (tenantId == null) {
HTableInterface statsHTable = null;
try {
- statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+ statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
timeStamp = Math.max(timeStamp, stats.getTimestamp());
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -839,7 +839,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// TableName systemCatalogTableName = region.getTableDesc().getTableName();
// HTableInterface hTable = env.getTable(systemCatalogTableName);
// These deprecated calls work around the issue
- HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
boolean allViewsInCurrentRegion = true;
int numOfChildViews = 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d7a90fb..2ebfa41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -42,8 +42,8 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d91ad51..33143bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -44,7 +44,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.DelegateResultIterator;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.FilterableStatement;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 12dc9ff..578855d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -33,12 +34,15 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.ResultIterators;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -47,6 +51,9 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.SchemaUtil;
@@ -105,7 +112,8 @@ public class ScanPlan extends BaseQueryPlan {
@Override
protected ResultIterator newIterator() throws SQLException {
// Set any scan attributes before creating the scanner, as it will be too late afterwards
- context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
+ Scan scan = context.getScan();
+ scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
ResultIterator scanner;
TableRef tableRef = this.getTableRef();
PTable table = tableRef.getTable();
@@ -114,7 +122,40 @@ public class ScanPlan extends BaseQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
+ boolean isSerial = false;
+ Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+ /*
+ * If a limit is provided and we have no filter, run the scan serially when we estimate that
+ * the limit's worth of data will fit into a single region.
+ */
+ if (perScanLimit != null && scan.getFilter() == null) {
+ GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+ long estRowSize = SchemaUtil.estimateRowSize(table);
+ long estRegionSize;
+ if (gpsInfo == null) {
+ // Use guidepost depth as minimum size
+ ConnectionQueryServices services = context.getConnection().getQueryServices();
+ HTableDescriptor desc = services.getTableDescriptor(table.getName().getBytes());
+ int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+ long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+ estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+ } else {
+ // Region size estimated based on total number of bytes divided by number of regions
+ estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
+ }
+ // TODO: configurable number of bytes?
+ if (perScanLimit * estRowSize < estRegionSize) {
+ isSerial = true;
+ }
+ }
+ ResultIterators iterators;
+ if (isSerial) {
+ iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+ } else {
+ iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+ }
splits = iterators.getSplits();
scans = iterators.getScans();
if (isOrdered) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
new file mode 100644
index 0000000..519c162
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ *
+ * @since 0.1
+ */
+public abstract class BaseResultIterators extends ExplainTable implements ResultIterators {
+ private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
+ private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+ private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
+
+ private final List<List<Scan>> scans;
+ private final List<KeyRange> splits;
+ private final PTableStats tableStats;
+ private final byte[] physicalTableName;
+ private final QueryPlan plan;
+
+ static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
+ @Override
+ public KeyRange apply(HRegionLocation region) {
+ return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+ }
+ };
+
+ private PTable getTable() {
+ return plan.getTableRef().getTable();
+ }
+
+ private boolean useStats() {
+ Scan scan = context.getScan();
+ boolean isPointLookup = context.getScanRanges().isPointLookup();
+ /*
+ * Don't use guide posts if:
+ * 1) We're doing a point lookup, as HBase is fast enough at those
+ * to not need them to be further parallelized. TODO: perf test to verify
+ * 2) We're collecting stats, as in this case we need to scan entire
+ * regions worth of data to track where to put the guide posts.
+ */
+ if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+ return false;
+ }
+ return true;
+ }
+
+ public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
+ throws SQLException {
+ super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
+ this.plan = plan;
+ StatementContext context = plan.getContext();
+ TableRef tableRef = plan.getTableRef();
+ PTable table = tableRef.getTable();
+ FilterableStatement statement = plan.getStatement();
+ RowProjector projector = plan.getProjector();
+ physicalTableName = table.getPhysicalName().getBytes();
+ tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
+ Scan scan = context.getScan();
+ if (projector.isProjectEmptyKeyValue()) {
+ Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+ // If nothing projected into scan and we only have one column family, just allow everything
+ // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
+ // be quite a bit faster.
+ // Where condition columns also will get added into familyMap
+ // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+ if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+ && table.getColumnFamilies().size() == 1) {
+ // Project the one column family. We must project a column family since it's possible
+ // that there are other non declared column families that we need to ignore.
+ scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+ ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+ } else {
+ byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+ // Project empty key value unless the column family containing it has
+ // been projected in its entirety.
+ if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+ scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+ }
+ }
+ } else if (table.getViewType() == ViewType.MAPPED) {
+ // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+ // selected column values are returned back to client
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
+
+ // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+ if (perScanLimit != null) {
+ ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+ }
+
+ doColumnProjectionOptimization(context, scan, table, statement);
+
+ this.scans = getParallelScans();
+ List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+ for (List<Scan> scanList : scans) {
+ for (Scan aScan : scanList) {
+ splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+ }
+ }
+ this.splits = ImmutableList.copyOf(splitRanges);
+ }
+
+ private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ if (familyMap != null && !familyMap.isEmpty()) {
+ // columnsTracker contain cf -> qualifiers which should get returned.
+ Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
+ new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+ Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ int referencedCfCount = familyMap.size();
+ for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+ if (!(familyMap.containsKey(whereCol.getFirst()))) {
+ referencedCfCount++;
+ }
+ }
+ boolean useOptimization;
+ if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
+ // Do not use the optimization
+ useOptimization = false;
+ } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
+ // Strictly use the optimization
+ useOptimization = true;
+ } else {
+ // when referencedCfCount is >1 and no Hints, we are not using the optimization
+ useOptimization = referencedCfCount == 1;
+ }
+ if (useOptimization) {
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+ NavigableSet<byte[]> qs = entry.getValue();
+ NavigableSet<ImmutableBytesPtr> cols = null;
+ if (qs != null) {
+ cols = new TreeSet<ImmutableBytesPtr>();
+ for (byte[] q : qs) {
+ cols.add(new ImmutableBytesPtr(q));
+ }
+ }
+ columnsTracker.put(cf, cols);
+ }
+ }
+ // Making sure that where condition CFs are getting scanned at HRS.
+ for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+ if (useOptimization) {
+ if (!(familyMap.containsKey(whereCol.getFirst()))) {
+ scan.addFamily(whereCol.getFirst());
+ conditionOnlyCfs.add(whereCol.getFirst());
+ }
+ } else {
+ if (familyMap.containsKey(whereCol.getFirst())) {
+ // where column's CF is present. If there are some specific columns added against this CF, we
+ // need to ensure this where column also getting added in it.
+ // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
+ // specifically add the where column. Adding that will remove the cf1.* stuff and only this
+ // where condition column will get returned!
+ NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
+ // cols is null means the whole CF will get scanned.
+ if (cols != null) {
+ scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+ }
+ } else {
+ // where column's CF itself is not present in family map. We need to add the column
+ scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+ }
+ }
+ }
+ if (useOptimization && !columnsTracker.isEmpty()) {
+ for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+ // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+ // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+ scan.addFamily(f.get());
+ }
+ // We don't need this filter for aggregates, as we're not returning back what's
+ // in the scan in this case. We still want the other optimization that causes
+ // the ExplicitColumnTracker not to be used, though.
+ if (!(statement.isAggregate())) {
+ ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+ columnsTracker, conditionOnlyCfs));
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return scans;
+ }
+
+ private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+ int nBoundaries = regionLocations.size() - 1;
+ List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+ for (int i = 0; i < nBoundaries; i++) {
+ HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+ ranges.add(regionInfo.getEndKey());
+ }
+ return ranges;
+ }
+
+ private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+ int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+ // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+ // in the next region (since we're matching on the end boundary).
+ guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+ return guideIndex;
+ }
+
+ private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+ int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+ // If we found an exact match, return the index we found as the exclusiveKey won't be
+ // contained in the next region as with getIndexContainingInclusive.
+ guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+ return guideIndex;
+ }
+
+ private List<byte[]> getGuidePosts() {
+ /*
+ * Don't use guide posts if:
+ * 1) We're doing a point lookup, as HBase is fast enough at those
+ * to not need them to be further parallelized. TODO: pref test to verify
+ * 2) We're collecting stats, as in this case we need to scan entire
+ * regions worth of data to track where to put the guide posts.
+ */
+ if (!useStats()) {
+ return Collections.emptyList();
+ }
+
+ List<byte[]> gps = null;
+ PTable table = getTable();
+ Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
+ byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+ if (table.getColumnFamilies().isEmpty()) {
+ // For sure we can get the defaultCF from the table
+ if (guidePostMap.get(defaultCF) != null) {
+ gps = guidePostMap.get(defaultCF).getGuidePosts();
+ }
+ } else {
+ Scan scan = context.getScan();
+ if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+ // If default CF is not used in scan, use first CF referenced in scan
+ GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
+ if (guidePostsInfo != null) {
+ gps = guidePostsInfo.getGuidePosts();
+ }
+ } else {
+ // Otherwise, favor use of default CF.
+ if (guidePostMap.get(defaultCF) != null) {
+ gps = guidePostMap.get(defaultCF).getGuidePosts();
+ }
+ }
+ }
+ if (gps == null) {
+ return Collections.emptyList();
+ }
+ return gps;
+ }
+
+ private static String toString(List<byte[]> gps) {
+ StringBuilder buf = new StringBuilder(gps.size() * 100);
+ buf.append("[");
+ for (int i = 0; i < gps.size(); i++) {
+ buf.append(Bytes.toStringBinary(gps.get(i)));
+ buf.append(",");
+ if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
+ buf.append("\n");
+ }
+ }
+ buf.setCharAt(buf.length()-1, ']');
+ return buf.toString();
+ }
+
+ private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
+ PTable table = getTable();
+ boolean startNewScanList = false;
+ if (!plan.isRowKeyOrdered()) {
+ startNewScanList = true;
+ } else if (crossedRegionBoundary) {
+ if (table.getIndexType() == IndexType.LOCAL) {
+ startNewScanList = true;
+ } else if (table.getBucketNum() != null) {
+ startNewScanList = scans.isEmpty() ||
+ ScanUtil.crossesPrefixBoundary(startKey,
+ ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
+ SaltingUtil.NUM_SALTING_BYTES);
+ }
+ }
+ if (scan != null) {
+ scans.add(scan);
+ }
+ if (startNewScanList && !scans.isEmpty()) {
+ parallelScans.add(scans);
+ scans = Lists.newArrayListWithExpectedSize(1);
+ }
+ return scans;
+ }
+
+ private List<List<Scan>> getParallelScans() throws SQLException {
+ return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ }
+
+ /**
+ * Compute the list of parallel scans to run for a given query. The inner scans
+ * may be concatenated together directly, while the other ones may need to be
+ * merge sorted, depending on the query.
+ * @return list of parallel scans to run for a given query.
+ * @throws SQLException
+ */
+ private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
+ Scan scan = context.getScan();
+ List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+ .getAllTableRegions(physicalTableName);
+
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ ScanRanges scanRanges = context.getScanRanges();
+ PTable table = getTable();
+ boolean isSalted = table.getBucketNum() != null;
+ boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+ List<byte[]> gps = getGuidePosts();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Guideposts: " + toString(gps));
+ }
+ boolean traverseAllRegions = isSalted || isLocalIndex;
+ if (!traverseAllRegions) {
+ byte[] scanStartRow = scan.getStartRow();
+ if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+ startKey = scanStartRow;
+ }
+ byte[] scanStopRow = scan.getStopRow();
+ if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
+ stopKey = scanStopRow;
+ }
+ }
+
+ int regionIndex = 0;
+ int stopIndex = regionBoundaries.size();
+ if (startKey.length > 0) {
+ regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+ }
+ if (stopKey.length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+ if (isLocalIndex) {
+ stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+ }
+ }
+ List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+
+ byte[] currentKey = startKey;
+ int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+ int gpsSize = gps.size();
+ int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+ int keyOffset = 0;
+ List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+ // Merge bisect with guideposts for all but the last region
+ while (regionIndex <= stopIndex) {
+ byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+ if (regionIndex == stopIndex) {
+ endKey = stopKey;
+ } else {
+ endKey = regionBoundaries.get(regionIndex);
+ }
+ if (isLocalIndex) {
+ HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
+ endRegionKey = regionInfo.getEndKey();
+ keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+ }
+ while (guideIndex < gpsSize
+ && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+ Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
+ scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
+ currentKey = currentGuidePost;
+ guideIndex++;
+ }
+ Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
+ if (isLocalIndex) {
+ if (newScan != null) {
+ newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+ } else if (!scans.isEmpty()) {
+ scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+ }
+ }
+ scans = addNewScan(parallelScans, scans, newScan, endKey, true);
+ currentKey = endKey;
+ regionIndex++;
+ }
+ if (!scans.isEmpty()) { // Add any remaining scans
+ parallelScans.add(scans);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
+ ScanUtil.getCustomAnnotations(scan)));
+ }
+ return parallelScans;
+ }
+
+ public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+ if (!reverse) {
+ return list;
+ }
+ return Lists.reverse(list);
+ }
+
+ /**
+ * Executes the scan in parallel across all regions, blocking until all scans are complete.
+ * @return the result iterators for the scan of each region
+ */
+ @Override
+ public List<PeekingResultIterator> getIterators() throws SQLException {
+ boolean success = false;
+ boolean isReverse = ScanUtil.isReversed(context.getScan());
+ boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
+ final ConnectionQueryServices services = context.getConnection().getQueryServices();
+ ReadOnlyProps props = services.getProps();
+ int numSplits = size();
+ List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
+ final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+ // TODO: what purpose does this scanID serve?
+ final UUID scanId = UUID.randomUUID();
+ try {
+ submitWork(scanId, scans, futures, splits.size());
+ int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+ boolean clearedCache = false;
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+ List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+ for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+ try {
+ PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ concatIterators.add(iterator);
+ } catch (ExecutionException e) {
+ try { // Rethrow as SQLException
+ throw ServerUtil.parseServerException(e);
+ } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+ if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+ services.clearTableRegionCache(physicalTableName);
+ clearedCache = true;
+ }
+ // Resubmit just this portion of work again
+ Scan oldScan = scanPair.getFirst();
+ byte[] startKey = oldScan.getStartRow();
+ byte[] endKey = oldScan.getStopRow();
+ if (isLocalIndex) {
+ endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+ }
+ List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+ // Add any concatIterators that were successful so far
+ // as we need these to be in order
+ addIterator(iterators, concatIterators);
+ concatIterators = Collections.emptyList();
+ submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+ for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+ // Immediate do a get (not catching exception again) and then add the iterators we
+ // get back immediately. They'll be sorted as expected, since they're replacing the
+ // original one.
+ PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ iterators.add(iterator);
+ }
+ }
+ }
+ }
+ }
+ addIterator(iterators, concatIterators);
+ }
+
+ success = true;
+ return iterators;
+ } catch (SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ if (!success) {
+ SQLCloseables.closeAllQuietly(iterators);
+ // Don't call cancel on already started work, as it causes the HConnection
+ // to get into a funk. Instead, just cancel queued work.
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+ for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+ futurePair.getSecond().cancel(false);
+ }
+ }
+ }
+ }
+ }
+
+ private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) {
+ if (!childIterators.isEmpty()) {
+ parentIterators.add(ConcatResultIterator.newIterator(childIterators));
+ }
+ }
+
+ protected static final class ScanLocator {
+ private final int outerListIndex;
+ private final int innerListIndex;
+ private final Scan scan;
+
+ public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) {
+ this.outerListIndex = outerListIndex;
+ this.innerListIndex = innerListIndex;
+ this.scan = scan;
+ }
+ public int getOuterListIndex() {
+ return outerListIndex;
+ }
+ public int getInnerListIndex() {
+ return innerListIndex;
+ }
+ public Scan getScan() {
+ return scan;
+ }
+ }
+
+
+ abstract protected String getName();
+ abstract protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+ List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize);
+
+ @Override
+ public int size() {
+ return this.scans.size();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
+ StringBuilder buf = new StringBuilder();
+ buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + getName() + " " + size() + "-WAY ");
+ explain(buf.toString(),planSteps);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultIterators [name=" + getName() + ",scans=" + scans + "]";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 4a62259..fecb0d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -44,7 +44,7 @@ import com.google.common.base.Preconditions;
public class ChunkedResultIterator implements PeekingResultIterator {
private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
- private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
+ private final ParallelIteratorFactory delegateIteratorFactory;
private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
private final StatementContext context;
private final TableRef tableRef;
@@ -52,12 +52,12 @@ public class ChunkedResultIterator implements PeekingResultIterator {
private final long chunkSize;
private PeekingResultIterator resultIterator;
- public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory {
+ public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
- private final ParallelIterators.ParallelIteratorFactory delegateFactory;
+ private final ParallelIteratorFactory delegateFactory;
private final TableRef tableRef;
- public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory
+ public ChunkedResultIteratorFactory(ParallelIteratorFactory
delegateFactory, TableRef tableRef) {
this.delegateFactory = delegateFactory;
this.tableRef = tableRef;
@@ -74,7 +74,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
}
- public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
+ public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index cddf3b3..03f8785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -39,8 +39,13 @@ public class ConcatResultIterator implements PeekingResultIterator {
this.resultIterators = iterators;
}
+ private ConcatResultIterator(List<PeekingResultIterator> iterators) {
+ this.resultIterators = null;
+ this.iterators = iterators;
+ }
+
private List<PeekingResultIterator> getIterators() throws SQLException {
- if (iterators == null) {
+ if (iterators == null && resultIterators != null) {
iterators = resultIterators.getIterators();
}
return iterators;
@@ -59,7 +64,9 @@ public class ConcatResultIterator implements PeekingResultIterator {
@Override
public void explain(List<String> planSteps) {
- resultIterators.explain(planSteps);
+ if (resultIterators != null) {
+ resultIterators.explain(planSteps);
+ }
}
private PeekingResultIterator currentIterator() throws SQLException {
@@ -88,11 +95,11 @@ public class ConcatResultIterator implements PeekingResultIterator {
@Override
public String toString() {
- return "ConcatResultIterator [resultIterators=" + resultIterators
- + ", iterators=" + iterators + ", index=" + index + "]";
+ return "ConcatResultIterator [" + resultIterators == null ? ("iterators=" + iterators) : ("resultIterators=" + resultIterators)
+ + ", index=" + index + "]";
}
- public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) {
+ public static PeekingResultIterator newIterator(final List<PeekingResultIterator> concatIterators) {
if (concatIterators.isEmpty()) {
return PeekingResultIterator.EMPTY_ITERATOR;
}
@@ -100,24 +107,6 @@ public class ConcatResultIterator implements PeekingResultIterator {
if (concatIterators.size() == 1) {
return concatIterators.get(0);
}
- return new ConcatResultIterator(new ResultIterators() {
-
- @Override
- public List<PeekingResultIterator> getIterators() throws SQLException {
- return concatIterators;
- }
-
- @Override
- public int size() {
- return concatIterators.size();
- }
-
- @Override
- public void explain(List<String> planSteps) {
- // TODO: review what we should for explain plan here
- concatIterators.get(0).explain(planSteps);
- }
-
- });
+ return new ConcatResultIterator(concatIterators);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
index 98b95c6..63b3142 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
@@ -30,6 +30,10 @@ public class DelegateResultIterator implements ResultIterator {
this.delegate = delegate;
}
+ protected ResultIterator getDelegate() {
+ return delegate;
+ }
+
@Override
public void close() throws SQLException {
delegate.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
new file mode 100644
index 0000000..a80693d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Iterates through tuples up to a limit
+ *
+ *
+ * @since 1.2
+ */
+public class LimitingPeekingResultIterator extends LimitingResultIterator implements PeekingResultIterator {
+
+ public LimitingPeekingResultIterator(PeekingResultIterator delegate, int limit) {
+ super(delegate, limit);
+ }
+
+
+ @Override
+ protected PeekingResultIterator getDelegate() {
+ return (PeekingResultIterator) super.getDelegate();
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ return getDelegate().peek();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
new file mode 100644
index 0000000..1ad3af0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+
+public interface ParallelIteratorFactory {
+ PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
+}
\ No newline at end of file