You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:41:12 UTC
[37/50] [abbrv] phoenix git commit: PHOENIX-1680
phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
PHOENIX-1680 phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/05723b19
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/05723b19
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/05723b19
Branch: refs/heads/calcite
Commit: 05723b19cc107b72d9f752cd0441af260fc62e22
Parents: 3d50147
Author: maryannxue <we...@intel.com>
Authored: Wed Feb 25 17:33:41 2015 -0500
Committer: maryannxue <we...@intel.com>
Committed: Wed Feb 25 17:33:41 2015 -0500
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/HashJoinIT.java | 90 ++++++++++++++++++++
.../apache/phoenix/end2end/SortMergeJoinIT.java | 78 ++++++++++++++++-
.../apache/phoenix/compile/QueryCompiler.java | 37 ++++----
3 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 03686f0..e915b36 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -41,6 +41,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Collection;
@@ -464,6 +465,21 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
" CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
" CLIENT MERGE SORT\n" +
" DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+ /*
+ * testJoinWithSetMaxRows()
+ * statement.setMaxRows(4);
+ * SELECT order_id, i.name, quantity FROM joinItemTable i
+ * JOIN joinOrderTable o ON o.item_id = i.item_id;
+ * SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+ * JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+ * ON o.item_id = i.item_id;
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"item_id\" IN (\"O.item_id\")\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
}});
testCases.add(new String[][] {
{
@@ -831,6 +847,21 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
" CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
" CLIENT MERGE SORT\n" +
" DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+ /*
+ * testJoinWithSetMaxRows()
+ * statement.setMaxRows(4);
+ * SELECT order_id, i.name, quantity FROM joinItemTable i
+ * JOIN joinOrderTable o ON o.item_id = i.item_id;
+ * SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+ * JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+ * ON o.item_id = i.item_id;
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".OrderTable\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
}});
testCases.add(new String[][] {
{
@@ -1221,6 +1252,23 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
" CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
" CLIENT MERGE SORT\n" +
" DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+ /*
+ * testJoinWithSetMaxRows()
+ * statement.setMaxRows(4);
+ * SELECT order_id, i.name, quantity FROM joinItemTable i
+ * JOIN joinOrderTable o ON o.item_id = i.item_id;
+ * SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+ * JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+ * ON o.item_id = i.item_id;
+ */
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"item_id\" IN (\"O.item_id\")\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
}});
return testCases;
}
@@ -3734,6 +3782,48 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testJoinWithSetMaxRows() throws Exception {
+ String [] queries = new String[2];
+ queries[0] = "SELECT \"order_id\", i.name, quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\"";
+ queries[1] = "SELECT o.\"order_id\", i.name, o.quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+ + "(SELECT \"order_id\", \"item_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") o "
+ + "ON o.\"item_id\" = i.\"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ for (String query : queries) {
+ Statement statement = conn.createStatement();
+ statement.setMaxRows(4);
+ ResultSet rs = statement.executeQuery(query);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getInt(3), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getInt(3), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getInt(3), 5000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getInt(3), 2000);
+
+ assertFalse(rs.next());
+
+ rs = statement.executeQuery("EXPLAIN " + query);
+ assertEquals(plans[25], QueryUtil.getExplainPlan(rs));
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
index 7912803..6f14a45 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -40,6 +40,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Collection;
@@ -120,6 +121,14 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
" SERVER SORTED BY [\"O.item_id\"]\n" +
" CLIENT MERGE SORT\n" +
" CLIENT SORTED BY [\"I.supplier_id\"]",
+
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER SORTED BY [\"O.item_id\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ "CLIENT 4 ROW LIMIT",
}});
testCases.add(new String[][] {
{
@@ -142,7 +151,18 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
" SERVER FILTER BY QUANTITY < 5000\n" +
" SERVER SORTED BY [\"O.item_id\"]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT SORTED BY [\"I.0:supplier_id\"]"
+ " CLIENT SORTED BY [\"I.0:supplier_id\"]",
+
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [\"I.:item_id\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER SORTED BY [\"O.item_id\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ "CLIENT 4 ROW LIMIT",
}});
testCases.add(new String[][] {
{
@@ -165,7 +185,18 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
" SERVER FILTER BY QUANTITY < 5000\n" +
" SERVER SORTED BY [\"O.item_id\"]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT SORTED BY [\"I.0:supplier_id\"]"
+ " CLIENT SORTED BY [\"I.0:supplier_id\"]",
+
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [\"I.:item_id\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+ " SERVER SORTED BY [\"O.item_id\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ "CLIENT 4 ROW LIMIT",
}});
return testCases;
}
@@ -2584,5 +2615,48 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testJoinWithSetMaxRows() throws Exception {
+ String [] queries = new String[2];
+ queries[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+ + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\"";
+ queries[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ o.\"order_id\", i.name, o.quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+ + "(SELECT \"order_id\", \"item_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") o "
+ + "ON o.\"item_id\" = i.\"item_id\"";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ for (int i = 0; i < queries.length; i++) {
+ String query = queries[i];
+ Statement statement = conn.createStatement();
+ statement.setMaxRows(4);
+ ResultSet rs = statement.executeQuery(query);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getInt(3), 1000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getInt(3), 3000);
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getInt(3), 5000);
+ assertTrue (rs.next());
+ assertTrue(rs.getString(1).equals("000000000000002") || rs.getString(1).equals("000000000000004"));
+ assertEquals(rs.getString(2), "T6");
+ assertTrue(rs.getInt(3) == 2000 || rs.getInt(3) == 4000);
+
+ assertFalse(rs.next());
+
+ rs = statement.executeQuery("EXPLAIN " + query);
+ assertEquals(i == 0 ? plans[1] : plans[1].replaceFirst("O\\.item_id", "item_id"), QueryUtil.getExplainPlan(rs));
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9642489..137f4e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -277,8 +277,8 @@ public class QueryCompiler {
QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin());
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
- if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
- limit = LimitCompiler.compile(context, query);
+ if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+ limit = plan.getLimit();
}
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection);
return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans);
@@ -333,8 +333,8 @@ public class QueryCompiler {
QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
- if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
- limit = LimitCompiler.compile(context, rhs);
+ if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+ limit = rhsPlan.getLimit();
}
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -429,16 +429,21 @@ public class QueryCompiler {
}
protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException {
- subquery = SubselectRewriter.flatten(subquery, this.statement.getConnection());
- ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection());
+ PhoenixConnection connection = this.statement.getConnection();
+ subquery = SubselectRewriter.flatten(subquery, connection);
+ ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, connection);
subquery = StatementNormalizer.normalize(subquery, resolver);
- SelectStatement transformedSubquery = SubqueryRewriter.transform(subquery, resolver, this.statement.getConnection());
+ SelectStatement transformedSubquery = SubqueryRewriter.transform(subquery, resolver, connection);
if (transformedSubquery != subquery) {
- resolver = FromCompiler.getResolverForQuery(transformedSubquery, this.statement.getConnection());
+ resolver = FromCompiler.getResolverForQuery(transformedSubquery, connection);
subquery = StatementNormalizer.normalize(transformedSubquery, resolver);
}
+ int maxRows = this.statement.getMaxRows();
+ this.statement.setMaxRows(0); // overwrite maxRows to avoid its impact on inner queries.
QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver).compile();
- return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+ plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+ this.statement.setMaxRows(maxRows); // restore maxRows.
+ return plan;
}
protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
@@ -490,12 +495,14 @@ public class QueryCompiler {
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
// Final step is to build the query plan
- int maxRows = statement.getMaxRows();
- if (maxRows > 0) {
- if (limit != null) {
- limit = Math.min(limit, maxRows);
- } else {
- limit = maxRows;
+ if (!asSubquery) {
+ int maxRows = statement.getMaxRows();
+ if (maxRows > 0) {
+ if (limit != null) {
+ limit = Math.min(limit, maxRows);
+ } else {
+ limit = maxRows;
+ }
}
}