You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:41:12 UTC

[37/50] [abbrv] phoenix git commit: PHOENIX-1680 phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java

PHOENIX-1680 phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/05723b19
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/05723b19
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/05723b19

Branch: refs/heads/calcite
Commit: 05723b19cc107b72d9f752cd0441af260fc62e22
Parents: 3d50147
Author: maryannxue <we...@intel.com>
Authored: Wed Feb 25 17:33:41 2015 -0500
Committer: maryannxue <we...@intel.com>
Committed: Wed Feb 25 17:33:41 2015 -0500

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 90 ++++++++++++++++++++
 .../apache/phoenix/end2end/SortMergeJoinIT.java | 78 ++++++++++++++++-
 .../apache/phoenix/compile/QueryCompiler.java   | 37 ++++----
 3 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 03686f0..e915b36 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -41,6 +41,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
@@ -464,6 +465,21 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
                 "        CLIENT MERGE SORT\n" +
                 "    DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+                /*
+                 * testJoinWithSetMaxRows()
+                 *     statement.setMaxRows(4);
+                 *     SELECT order_id, i.name, quantity FROM joinItemTable i
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id;
+                 *     SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+                 *     JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+                 *     ON o.item_id = i.item_id;
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"item_id\" IN (\"O.item_id\")\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -831,6 +847,21 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
                 "        CLIENT MERGE SORT\n" +
                 "    DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+                /*
+                 * testJoinWithSetMaxRows()
+                 *     statement.setMaxRows(4);
+                 *     SELECT order_id, i.name, quantity FROM joinItemTable i
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id;
+                 *     SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+                 *     JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+                 *     ON o.item_id = i.item_id;
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".OrderTable\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -1221,6 +1252,23 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
                 "        CLIENT MERGE SORT\n" +
                 "    DYNAMIC SERVER FILTER BY (COL0, COL1, COL2) IN ((RHS.COL1, RHS.COL2, TO_INTEGER((RHS.COL3 - 1))))",
+                /*
+                 * testJoinWithSetMaxRows()
+                 *     statement.setMaxRows(4);
+                 *     SELECT order_id, i.name, quantity FROM joinItemTable i
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id;
+                 *     SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+                 *     JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+                 *     ON o.item_id = i.item_id;
+                 */
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"item_id\" IN (\"O.item_id\")\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
                 }});
         return testCases;
     }
@@ -3734,6 +3782,48 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @Test
+    public void testJoinWithSetMaxRows() throws Exception {
+        String [] queries = new String[2];
+        queries[0] = "SELECT \"order_id\", i.name, quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\"";
+        queries[1] = "SELECT o.\"order_id\", i.name, o.quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " 
+                + "(SELECT \"order_id\", \"item_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") o " 
+                + "ON o.\"item_id\" = i.\"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            for (String query : queries) {
+                Statement statement = conn.createStatement();
+                statement.setMaxRows(4);
+                ResultSet rs = statement.executeQuery(query);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000001");
+                assertEquals(rs.getString(2), "T1");
+                assertEquals(rs.getInt(3), 1000);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000003");
+                assertEquals(rs.getString(2), "T2");
+                assertEquals(rs.getInt(3), 3000);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000005");
+                assertEquals(rs.getString(2), "T3");
+                assertEquals(rs.getInt(3), 5000);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000002");
+                assertEquals(rs.getString(2), "T6");
+                assertEquals(rs.getInt(3), 2000);
+
+                assertFalse(rs.next());
+                
+                rs = statement.executeQuery("EXPLAIN " + query);
+                assertEquals(plans[25], QueryUtil.getExplainPlan(rs));
+            }
+        } finally {
+            conn.close();
+        }
+    }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
index 7912803..6f14a45 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -40,6 +40,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
@@ -120,6 +121,14 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
                 "            SERVER SORTED BY [\"O.item_id\"]\n" +
                 "        CLIENT MERGE SORT\n" +
                 "    CLIENT SORTED BY [\"I.supplier_id\"]",
+                
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "        SERVER SORTED BY [\"O.item_id\"]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "CLIENT 4 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -142,7 +151,18 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
                 "            SERVER FILTER BY QUANTITY < 5000\n" +
                 "            SERVER SORTED BY [\"O.item_id\"]\n" +
                 "        CLIENT MERGE SORT\n" +
-                "    CLIENT SORTED BY [\"I.0:supplier_id\"]"
+                "    CLIENT SORTED BY [\"I.0:supplier_id\"]",
+                
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                "        SERVER SORTED BY [\"I.:item_id\"]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "        SERVER SORTED BY [\"O.item_id\"]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "CLIENT 4 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -165,7 +185,18 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
                 "            SERVER FILTER BY QUANTITY < 5000\n" +
                 "            SERVER SORTED BY [\"O.item_id\"]\n" +
                 "        CLIENT MERGE SORT\n" +
-                "    CLIENT SORTED BY [\"I.0:supplier_id\"]"
+                "    CLIENT SORTED BY [\"I.0:supplier_id\"]",
+                
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                "        SERVER SORTED BY [\"I.:item_id\"]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "        SERVER SORTED BY [\"O.item_id\"]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "CLIENT 4 ROW LIMIT",
                 }});
         return testCases;
     }
@@ -2584,5 +2615,48 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @Test
+    public void testJoinWithSetMaxRows() throws Exception {
+        String [] queries = new String[2];
+        queries[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\"";
+        queries[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ o.\"order_id\", i.name, o.quantity FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " 
+                + "(SELECT \"order_id\", \"item_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") o " 
+                + "ON o.\"item_id\" = i.\"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            for (int i = 0; i < queries.length; i++) {
+                String query = queries[i];
+                Statement statement = conn.createStatement();
+                statement.setMaxRows(4);
+                ResultSet rs = statement.executeQuery(query);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000001");
+                assertEquals(rs.getString(2), "T1");
+                assertEquals(rs.getInt(3), 1000);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000003");
+                assertEquals(rs.getString(2), "T2");
+                assertEquals(rs.getInt(3), 3000);
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000005");
+                assertEquals(rs.getString(2), "T3");
+                assertEquals(rs.getInt(3), 5000);
+                assertTrue (rs.next());
+                assertTrue(rs.getString(1).equals("000000000000002") || rs.getString(1).equals("000000000000004"));
+                assertEquals(rs.getString(2), "T6");
+                assertTrue(rs.getInt(3) == 2000 || rs.getInt(3) == 4000);
+
+                assertFalse(rs.next());
+                
+                rs = statement.executeQuery("EXPLAIN " + query);
+                assertEquals(i == 0 ? plans[1] : plans[1].replaceFirst("O\\.item_id", "item_id"), QueryUtil.getExplainPlan(rs));
+            }
+        } finally {
+            conn.close();
+        }
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05723b19/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9642489..137f4e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -277,8 +277,8 @@ public class QueryCompiler {
             QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin());
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
             Integer limit = null;
-            if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
-                limit = LimitCompiler.compile(context, query);
+            if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+                limit = plan.getLimit();
             }
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection);
             return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans);
@@ -333,8 +333,8 @@ public class QueryCompiler {
             QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
-            if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
-                limit = LimitCompiler.compile(context, rhs);
+            if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+                limit = rhsPlan.getLimit();
             }
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
             Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -429,16 +429,21 @@ public class QueryCompiler {
     }
 
     protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException {
-        subquery = SubselectRewriter.flatten(subquery, this.statement.getConnection());
-        ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection());
+        PhoenixConnection connection = this.statement.getConnection();
+        subquery = SubselectRewriter.flatten(subquery, connection);
+        ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, connection);
         subquery = StatementNormalizer.normalize(subquery, resolver);
-        SelectStatement transformedSubquery = SubqueryRewriter.transform(subquery, resolver, this.statement.getConnection());
+        SelectStatement transformedSubquery = SubqueryRewriter.transform(subquery, resolver, connection);
         if (transformedSubquery != subquery) {
-            resolver = FromCompiler.getResolverForQuery(transformedSubquery, this.statement.getConnection());
+            resolver = FromCompiler.getResolverForQuery(transformedSubquery, connection);
             subquery = StatementNormalizer.normalize(transformedSubquery, resolver);
         }
+        int maxRows = this.statement.getMaxRows();
+        this.statement.setMaxRows(0); // overwrite maxRows to avoid its impact on inner queries.
         QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver).compile();
-        return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+        plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+        this.statement.setMaxRows(maxRows); // restore maxRows.
+        return plan;
     }
 
     protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
@@ -490,12 +495,14 @@ public class QueryCompiler {
         RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
 
         // Final step is to build the query plan
-        int maxRows = statement.getMaxRows();
-        if (maxRows > 0) {
-            if (limit != null) {
-                limit = Math.min(limit, maxRows);
-            } else {
-                limit = maxRows;
+        if (!asSubquery) {
+            int maxRows = statement.getMaxRows();
+            if (maxRows > 0) {
+                if (limit != null) {
+                    limit = Math.min(limit, maxRows);
+                } else {
+                    limit = maxRows;
+                }
             }
         }