You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/04/28 18:03:16 UTC
[1/2] git commit: PHOENIX-951 Don't push LIMIT as PageFilter for joins
Repository: incubator-phoenix
Updated Branches:
refs/heads/4.0 12e5f7063 -> f6e70b6ee
PHOENIX-951 Don't push LIMIT as PageFilter for joins
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/9e80bbe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/9e80bbe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/9e80bbe7
Branch: refs/heads/4.0
Commit: 9e80bbe7624b8ce3c83769dc97504200bcd8b56c
Parents: c4e3dd8
Author: maryannxue <ma...@apache.org>
Authored: Mon Apr 28 12:02:38 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Mon Apr 28 12:02:38 2014 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/HashJoinIT.java | 143 +++++++++++++++++++
.../apache/phoenix/compile/JoinCompiler.java | 15 +-
.../apache/phoenix/compile/QueryCompiler.java | 24 ++--
.../coprocessor/HashJoinRegionScanner.java | 13 +-
.../apache/phoenix/execute/HashJoinPlan.java | 34 ++---
.../org/apache/phoenix/execute/ScanPlan.java | 6 +-
.../org/apache/phoenix/join/HashJoinInfo.java | 16 ++-
7 files changed, 215 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 2e0d329..6493a2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -404,6 +404,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
" PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
" BUILD HASH TABLE 0\n" +
" CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER FILTER BY PageFilter 4\n" +
+ " SERVER 4 ROW LIMIT\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+ " BUILD HASH TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+ " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+ " BUILD HASH TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+ " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
}});
testCases.add(new String[][] {
{
@@ -710,6 +742,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
" PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
" BUILD HASH TABLE 0\n" +
" CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER FILTER BY PageFilter 4\n" +
+ " SERVER 4 ROW LIMIT\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+ " BUILD HASH TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+ " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+ " BUILD HASH TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+ " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
}});
return testCases;
}
@@ -3015,6 +3079,85 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
conn.close();
}
}
+
+ @Test
+ public void testJoinWithLimit() throws Exception {
+ String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s LEFT JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 4";
+ String query2 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s JOIN "
+ + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 4";
+ Properties props = new Properties(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query1);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getInt(5), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getInt(5), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getString(4), "202 YYY Street");
+ assertEquals(rs.getInt(5), 5000);
+ assertTrue (rs.next());
+ assertNull(rs.getString(1));
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getString(4), "202 YYY Street");
+ assertEquals(rs.getInt(5), 0);
+
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query1);
+ assertEquals(plans[19], QueryUtil.getExplainPlan(rs));
+
+ statement = conn.prepareStatement(query2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getInt(5), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getString(4), "101 YYY Street");
+ assertEquals(rs.getInt(5), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getString(4), "202 YYY Street");
+ assertEquals(rs.getInt(5), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getString(4), "606 YYY Street");
+ assertEquals(rs.getInt(5), 2000);
+
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+ assertEquals(plans[20], QueryUtil.getExplainPlan(rs));
+ } finally {
+ conn.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index c494194..7fb2d6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -225,6 +225,7 @@ public class JoinCompiler {
private final List<ParseNode> postFilters;
private final List<Table> tables;
private final List<TableRef> tableRefs;
+ private final boolean allLeftJoin;
private final boolean hasRightJoin;
private final List<JoinTable> prefilterAcceptedTables;
@@ -234,6 +235,7 @@ public class JoinCompiler {
this.postFilters = Collections.<ParseNode>emptyList();
this.tables = Collections.<Table>singletonList(table);
this.tableRefs = Collections.<TableRef>singletonList(table.getTableRef());
+ this.allLeftJoin = false;
this.hasRightJoin = false;
this.prefilterAcceptedTables = Collections.<JoinTable>emptyList();
}
@@ -245,16 +247,20 @@ public class JoinCompiler {
this.tables = new ArrayList<Table>();
this.tableRefs = new ArrayList<TableRef>();
this.tables.add(table);
+ boolean allLeftJoin = true;
int lastRightJoinIndex = -1;
for (int i = 0; i < joinSpecs.size(); i++) {
- this.tables.addAll(joinSpecs.get(i).getJoinTable().getTables());
- if (joinSpecs.get(i).getType() == JoinType.Right) {
+ JoinSpec joinSpec = joinSpecs.get(i);
+ this.tables.addAll(joinSpec.getJoinTable().getTables());
+ allLeftJoin = allLeftJoin && joinSpec.getType() == JoinType.Left;
+ if (joinSpec.getType() == JoinType.Right) {
lastRightJoinIndex = i;
}
}
for (Table t : this.tables) {
this.tableRefs.add(t.getTableRef());
}
+ this.allLeftJoin = allLeftJoin;
this.hasRightJoin = lastRightJoinIndex > -1;
this.prefilterAcceptedTables = new ArrayList<JoinTable>();
for (int i = lastRightJoinIndex == -1 ? 0 : lastRightJoinIndex; i < joinSpecs.size(); i++) {
@@ -281,6 +287,10 @@ public class JoinCompiler {
return tableRefs;
}
+ public boolean isAllLeftJoin() {
+ return allLeftJoin;
+ }
+
public SelectStatement getStatement() {
return select;
}
@@ -351,7 +361,6 @@ public class JoinCompiler {
* Returns a boolean vector indicating whether the evaluation of join expressions
* can be evaluated at an early stage if the input JoinSpec can be taken as a
* star join. Otherwise returns null.
- * @param join the JoinSpec
* @return a boolean vector for a star join; or null for non star join.
*/
public boolean[] getStarJoinVector() {
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8b5cd93..1f39ad9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -127,7 +127,7 @@ public class QueryCompiler {
JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
return compileJoinQuery(context, binds, joinTable, false);
} else {
- return compileSingleQuery(context, select, binds, parallelIteratorFactory);
+ return compileSingleQuery(context, select, binds, parallelIteratorFactory, true);
}
}
@@ -144,7 +144,7 @@ public class QueryCompiler {
context.setCurrentTable(table.getTableRef());
context.setResolver(projectedTable.createColumnResolver());
table.projectColumns(context.getScan());
- return compileSingleQuery(context, subquery, binds, null);
+ return compileSingleQuery(context, subquery, binds, null, true);
}
QueryPlan plan = compileSubquery(subquery);
ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
@@ -219,9 +219,13 @@ public class QueryCompiler {
}
context.setCurrentTable(tableRef);
context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
- BasicQueryPlan plan = compileSingleQuery(context, query, binds, parallelIteratorFactory);
+ BasicQueryPlan plan = compileSingleQuery(context, query, binds, parallelIteratorFactory, joinTable.isAllLeftJoin());
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, forceProjection);
+ Integer limit = null;
+ if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+ limit = LimitCompiler.compile(context, query);
+ }
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection);
return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, joinPlans, clientProjectors);
}
@@ -270,9 +274,13 @@ public class QueryCompiler {
TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
context.setCurrentTable(rhsTableRef);
context.setResolver(projectedTable.createColumnResolver());
- BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory);
+ BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory, type == JoinType.Right);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, forceProjection);
+ Integer limit = null;
+ if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+ limit = LimitCompiler.compile(context, rhs);
+ }
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector});
}
@@ -287,7 +295,7 @@ public class QueryCompiler {
return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
}
- protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory) throws SQLException{
+ protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException{
PhoenixConnection connection = statement.getConnection();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
@@ -328,7 +336,7 @@ public class QueryCompiler {
if (select.isAggregate() || select.isDistinct()) {
return new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having);
} else {
- return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
+ return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 0be219f..47ffce7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -54,6 +54,8 @@ public class HashJoinRegionScanner implements RegionScanner {
private final HashJoinInfo joinInfo;
private Queue<Tuple> resultQueue;
private boolean hasMore;
+ private long count;
+ private long limit;
private HashCache[] hashCaches;
private List<Tuple>[] tempTuples;
private ValueBitSet tempDestBitSet;
@@ -66,11 +68,16 @@ public class HashJoinRegionScanner implements RegionScanner {
this.joinInfo = joinInfo;
this.resultQueue = new LinkedList<Tuple>();
this.hasMore = true;
+ this.count = 0;
+ this.limit = Long.MAX_VALUE;
if (joinInfo != null) {
for (JoinType type : joinInfo.getJoinTypes()) {
if (type != JoinType.Inner && type != JoinType.Left)
throw new DoNotRetryIOException("Got join type '" + type + "'. Expect only INNER or LEFT with hash-joins.");
}
+ if (joinInfo.getLimit() != null) {
+ this.limit = joinInfo.getLimit();
+ }
int count = joinInfo.getJoinIds().length;
this.tempTuples = new List[count];
this.hashCaches = new HashCache[count];
@@ -93,7 +100,7 @@ public class HashJoinRegionScanner implements RegionScanner {
}
}
- private void processResults(List<Cell> result, boolean hasLimit) throws IOException {
+ private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
if (result.isEmpty())
return;
@@ -106,7 +113,7 @@ public class HashJoinRegionScanner implements RegionScanner {
return;
}
- if (hasLimit)
+ if (hasBatchLimit)
throw new UnsupportedOperationException("Cannot support join operations in scans with limit");
int count = joinInfo.getJoinIds().length;
@@ -204,7 +211,7 @@ public class HashJoinRegionScanner implements RegionScanner {
for (int i = 0; i < tuple.size(); i++) {
results.add(tuple.getValue(i));
}
- return resultQueue.isEmpty() ? hasMore : true;
+ return (count++ < limit) && (resultQueue.isEmpty() ? hasMore : true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index abd4475..401c15b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -57,14 +57,14 @@ import com.google.common.collect.Lists;
public class HashJoinPlan implements QueryPlan {
private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
-
+
private final FilterableStatement statement;
private final BasicQueryPlan plan;
private final HashJoinInfo joinInfo;
private final List<Expression>[] hashExpressions;
private final QueryPlan[] hashPlans;
private final TupleProjector[] clientProjectors;
-
+
public HashJoinPlan(FilterableStatement statement,
BasicQueryPlan plan, HashJoinInfo joinInfo,
List<Expression>[] hashExpressions, QueryPlan[] hashPlans,
@@ -96,11 +96,11 @@ public class HashJoinPlan implements QueryPlan {
public ResultIterator iterator() throws SQLException {
ImmutableBytesPtr[] joinIds = joinInfo.getJoinIds();
assert (joinIds.length == hashExpressions.length && joinIds.length == hashPlans.length);
-
+
final HashCacheClient hashClient = new HashCacheClient(plan.getContext().getConnection());
Scan scan = plan.getContext().getScan();
final ScanRanges ranges = plan.getContext().getScanRanges();
-
+
int count = joinIds.length;
ConnectionQueryServices services = getContext().getConnection().getQueryServices();
ExecutorService executor = services.getExecutor();
@@ -144,7 +144,7 @@ public class HashJoinPlan implements QueryPlan {
} catch (ExecutionException e) {
if (firstException == null) {
firstException = new SQLException("Encountered exception in hash plan [" + i + "] execution.",
- e.getCause());
+ e.getCause());
}
}
}
@@ -152,12 +152,12 @@ public class HashJoinPlan implements QueryPlan {
SQLCloseables.closeAllQuietly(dependencies);
throw firstException;
}
-
+
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
-
+
return plan.iterator(dependencies);
}
-
+
@Override
public long getEstimatedSize() {
return plan.getEstimatedSize();
@@ -177,16 +177,19 @@ public class HashJoinPlan implements QueryPlan {
for (int i = 0; i < count; i++) {
boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
boolean skipMerge = joinInfo.getSchemas()[i].getFieldCount() == 0;
- planSteps.add(" BUILD HASH TABLE " + i + (earlyEvaluation ? "" : "(DELAYED EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
- List<String> steps = hashPlans[i].getExplainPlan().getPlanSteps();
- for (String step : steps) {
- planSteps.add(" " + step);
- }
+ planSteps.add(" BUILD HASH TABLE " + i + (earlyEvaluation ? "" : "(DELAYED EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
+ List<String> steps = hashPlans[i].getExplainPlan().getPlanSteps();
+ for (String step : steps) {
+ planSteps.add(" " + step);
+ }
}
if (joinInfo.getPostJoinFilterExpression() != null) {
- planSteps.add(" AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
+ planSteps.add(" AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
}
-
+ if (joinInfo.getLimit() != null) {
+ planSteps.add(" JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
+ }
+
return new ExplainPlan(planSteps);
}
@@ -217,7 +220,6 @@ public class HashJoinPlan implements QueryPlan {
@Override
public boolean isDegenerate() {
- // TODO can we determine this won't return anything?
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d09ed34..fb11b47 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -57,11 +57,13 @@ import org.apache.phoenix.util.ScanUtil;
*/
public class ScanPlan extends BasicQueryPlan {
private List<KeyRange> splits;
+ private boolean allowPageFilter;
- public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
+ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null,
parallelIteratorFactory != null ? parallelIteratorFactory :
new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+ this.allowPageFilter = allowPageFilter;
if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -89,7 +91,7 @@ public class ScanPlan extends BasicQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, isOrdered ? null : limit, parallelIteratorFactory);
+ ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
splits = iterators.getSplits();
if (isOrdered) {
scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/9e80bbe7/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index 62f8c71..3cbf58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -45,10 +45,11 @@ public class HashJoinInfo {
private KeyValueSchema[] schemas;
private int[] fieldPositions;
private Expression postJoinFilterExpression;
+ private Integer limit;
private boolean forceProjection;
- public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, boolean forceProjection) {
- this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, forceProjection);
+ public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
+ this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, forceProjection);
}
private static KeyValueSchema[] buildSchemas(PTable[] tables) {
@@ -71,7 +72,7 @@ public class HashJoinInfo {
return builder.build();
}
- private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, boolean forceProjection) {
+ private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
this.joinedSchema = joinedSchema;
this.joinIds = joinIds;
this.joinExpressions = joinExpressions;
@@ -80,6 +81,7 @@ public class HashJoinInfo {
this.schemas = schemas;
this.fieldPositions = fieldPositions;
this.postJoinFilterExpression = postJoinFilterExpression;
+ this.limit = limit;
this.forceProjection = forceProjection;
}
@@ -115,6 +117,10 @@ public class HashJoinInfo {
return postJoinFilterExpression;
}
+ public Integer getLimit() {
+ return limit;
+ }
+
/*
* If the LHS table is a sub-select, we always do projection, since
* the ON expressions reference only projected columns.
@@ -148,6 +154,7 @@ public class HashJoinInfo {
} else {
WritableUtils.writeVInt(output, -1);
}
+ WritableUtils.writeVInt(output, joinInfo.limit == null ? -1 : joinInfo.limit);
output.writeBoolean(joinInfo.forceProjection);
scan.setAttribute(HASH_JOIN, stream.toByteArray());
} catch (IOException e) {
@@ -204,8 +211,9 @@ public class HashJoinInfo {
postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
postJoinFilterExpression.readFields(input);
}
+ int limit = WritableUtils.readVInt(input);
boolean forceProjection = input.readBoolean();
- return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, forceProjection);
+ return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, limit >= 0 ? limit : null, forceProjection);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
[2/2] git commit: Merge branch '4.0' of
https://git-wip-us.apache.org/repos/asf/incubator-phoenix into 4.0
Posted by ma...@apache.org.
Merge branch '4.0' of https://git-wip-us.apache.org/repos/asf/incubator-phoenix into 4.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/f6e70b6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/f6e70b6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/f6e70b6e
Branch: refs/heads/4.0
Commit: f6e70b6eee1297d57c7cc5a5bb7cefb203785baf
Parents: 9e80bbe 12e5f70
Author: maryannxue <ma...@apache.org>
Authored: Mon Apr 28 12:03:04 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Mon Apr 28 12:03:04 2014 -0400
----------------------------------------------------------------------
bin/performance.py | 19 +-
bin/phoenix_utils.py | 20 ++
bin/psql.py | 13 +-
bin/readme.txt | 6 +-
bin/sqlline.py | 13 +-
dev/make_rc.sh | 133 +++++++++++
dev/release_files/LICENSE | 228 +++++++++++++++++++
dev/release_files/NOTICE | 49 ++++
.../phoenix/filter/ColumnProjectionFilter.java | 23 +-
phoenix-pig/pom.xml | 23 +-
.../phoenix/pig/PhoenixHBaseLoaderIT.java | 7 +-
.../phoenix/pig/PhoenixHBaseStorerIT.java | 9 +-
pom.xml | 1 +
13 files changed, 486 insertions(+), 58 deletions(-)
----------------------------------------------------------------------