You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2018/02/12 22:07:45 UTC
[2/2] phoenix git commit: PHOENIX-1556 Base hash versus sort merge
join decision on cost
PHOENIX-1556 Base hash versus sort merge join decision on cost
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9461d0d6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9461d0d6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9461d0d6
Branch: refs/heads/master
Commit: 9461d0d6a299bb3cbcf53905d7e9b73895a99299
Parents: a6bf735
Author: maryannxue <ma...@gmail.com>
Authored: Mon Feb 12 14:07:30 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Mon Feb 12 14:07:30 2018 -0800
----------------------------------------------------------------------
.../phoenix/end2end/CostBasedDecisionIT.java | 420 ++++++++++++-----
.../apache/phoenix/compile/JoinCompiler.java | 43 ++
.../phoenix/compile/ListJarsQueryPlan.java | 8 +-
.../apache/phoenix/compile/QueryCompiler.java | 449 ++++++++++---------
.../org/apache/phoenix/compile/QueryPlan.java | 2 +
.../apache/phoenix/compile/TraceQueryPlan.java | 6 +
.../apache/phoenix/execute/AggregatePlan.java | 41 +-
.../phoenix/execute/ClientAggregatePlan.java | 46 +-
.../phoenix/execute/ClientProcessingPlan.java | 4 +
.../apache/phoenix/execute/ClientScanPlan.java | 22 +-
.../apache/phoenix/execute/CorrelatePlan.java | 26 +-
.../apache/phoenix/execute/CursorFetchPlan.java | 6 +
.../apache/phoenix/execute/HashJoinPlan.java | 128 ++++--
.../execute/LiteralResultIterationPlan.java | 6 +
.../org/apache/phoenix/execute/ScanPlan.java | 14 +-
.../phoenix/execute/SortMergeJoinPlan.java | 20 +-
.../phoenix/execute/TupleProjectionPlan.java | 6 +
.../org/apache/phoenix/execute/UnionPlan.java | 12 +-
.../apache/phoenix/execute/UnnestArrayPlan.java | 6 +
.../execute/visitor/AvgRowWidthVisitor.java | 205 +++++++++
.../execute/visitor/ByteCountVisitor.java | 125 ++++++
.../execute/visitor/QueryPlanVisitor.java | 46 ++
.../execute/visitor/RowCountVisitor.java | 335 ++++++++++++++
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +
.../java/org/apache/phoenix/util/CostUtil.java | 61 +--
.../query/ParallelIteratorsSplitTest.java | 6 +
26 files changed, 1612 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
index a3584ce..493855a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -32,12 +32,16 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Maps;
public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+ private final String testTable500;
+ private final String testTable990;
+ private final String testTable1000;
@BeforeClass
public static void doSetup() throws Exception {
@@ -46,9 +50,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+ props.put(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, Long.toString(150000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
+ public CostBasedDecisionIT() throws Exception {
+ testTable500 = initTestTableValues(500);
+ testTable990 = initTestTableValues(990);
+ testTable1000 = initTestTableValues(1000);
+ }
+
@Test
public void testCostOverridesStaticPlanOrdering1() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -64,10 +75,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
// Use the data table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
- plan.contains("FULL SCAN"));
+ verifyQueryPlan(query, "FULL SCAN");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -81,10 +89,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the index table plan that has a lower cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
- plan.contains("RANGE SCAN"));
+ verifyQueryPlan(query, "RANGE SCAN");
} finally {
conn.close();
}
@@ -103,12 +108,12 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1";
// Use the index table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
- plan.contains("RANGE SCAN"));
+ verifyQueryPlan(query,
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -124,10 +129,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
// Given that the range on C1 is meaningless and group-by becomes
// order-preserving if using the data table, the data table plan should
// come out as the best plan based on the costs.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
- plan.contains("FULL SCAN"));
+ verifyQueryPlan(query,
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -150,14 +156,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -171,14 +173,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -201,15 +199,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"UPSERT SELECT\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -223,15 +217,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"UPSERT SELECT\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -254,15 +244,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"DELETE ROWS\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -276,15 +262,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"DELETE ROWS\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -303,22 +285,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
- + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1 "
+ + "UNION ALL SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey >= 'a' GROUP BY c1";
// Use the default plan when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String defaultPlan =
+ verifyQueryPlan(query,
"UNION ALL OVER 2 QUERIES\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
" SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
- " SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
- " CLIENT MERGE SORT";
- assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
- plan.contains(defaultPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['a'] - [*]\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ " CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -332,19 +309,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the optimal plan based on cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String optimizedPlan =
+ verifyQueryPlan(query,
"UNION ALL OVER 2 QUERIES\n" +
" CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
- " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
- " SERVER FILTER BY C1 LIKE 'X%'\n" +
- " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
- assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
- plan.contains(optimizedPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" >= 'a'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -363,23 +337,18 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
- + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
- + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+ String query = "SELECT t1.rowkey, t1.c1, t1.c2, t2.c1, mc2 FROM " + tableName + " t1 "
+ + "JOIN (SELECT c1, max(rowkey) mrk, max(c2) mc2 FROM " + tableName + " where rowkey <= 'z' GROUP BY c1) t2 "
+ + "ON t1.rowkey = t2.mrk WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
// Use the default plan when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String defaultPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
" SERVER FILTER BY C1 LIKE 'X0%'\n" +
" PARALLEL INNER-JOIN TABLE 0\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
- " SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
" CLIENT MERGE SORT\n" +
- " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
- assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
- plan.contains(defaultPlan));
+ " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.MRK)");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -393,20 +362,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the optimal plan based on cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String optimizedPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
" SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
"CLIENT MERGE SORT\n" +
" PARALLEL INNER-JOIN TABLE 0\n" +
- " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
- " SERVER FILTER BY C1 LIKE 'X%'\n" +
- " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
- " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
- assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
- plan.contains(optimizedPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.MRK)");
} finally {
conn.close();
}
@@ -432,10 +398,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
// Use the index table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ verifyQueryPlan(query, indexPlan);
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -449,18 +412,261 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the data table plan that has a lower cost when stats are available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ verifyQueryPlan(query, dataPlan);
// Use the index table plan as has been hinted.
- rs = conn.createStatement().executeQuery("explain " + hintedQuery);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ verifyQueryPlan(hintedQuery, indexPlan);
} finally {
conn.close();
}
}
+
+ /** Sort-merge-join w/ both children ordered wins over hash-join. */
+ @Test
+ public void testJoinStrategy() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000;
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Sort-merge-join w/ both children ordered wins over hash-join in an un-grouped aggregate query. */
+ @Test
+ public void testJoinStrategy2() throws Exception {
+ String q = "SELECT count(*)\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "WHERE t1.COL1 < 200";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ " SERVER FILTER BY COL1 < 200\n" +
+ "AND (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT AGGREGATE INTO SINGLE ROW";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join w/ PK/FK optimization wins over sort-merge-join w/ larger side ordered. */
+ @Test
+ public void testJoinStrategy3() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.COL1 = t2.ID\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " DYNAMIC SERVER FILTER BY T2.ID IN (T1.COL1)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join w/ PK/FK optimization wins over hash-join w/o PK/FK optimization when two sides are close in size. */
+ @Test
+ public void testJoinStrategy4() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable990 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.COL1";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join wins over sort-merge-join w/ smaller side ordered. */
+ @Test
+ public void testJoinStrategy5() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.COL1\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join wins over sort-merge-join w/o any side ordered. */
+ @Test
+ public void testJoinStrategy6() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.COL1 = t2.COL1\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Hash-join wins over sort-merge-join w/ both sides ordered in an order-by query.
+ * This is because order-by can only be done on the client side after sort-merge-join
+ * and order-by w/o limit on the client side is very expensive.
+ */
+ @Test
+ public void testJoinStrategy7() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "ORDER BY t1.COL1";
+ String expected =
+ "CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER SORTED BY [T1.COL1]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ " DYNAMIC SERVER FILTER BY T2.ID IN (T1.ID)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Sort-merge-join w/ both sides ordered wins over hash-join in an order-by limit query.
+ * This is because order-by can only be done on the client side after sort-merge-join
+ * but order-by w/ limit on the client side is less expensive.
+ */
+ @Test
+ public void testJoinStrategy8() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "ORDER BY t1.COL1 LIMIT 5";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ "CLIENT TOP 5 ROWS SORTED BY [T1.COL1]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: sort-merge-join chosen since all join keys are PK.
+ */
+ @Test
+ public void testJoinStrategy9() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 LEFT JOIN " + testTable500 + " t2\n" +
+ "ON t1.ID = t2.ID AND t2.ID > 200\n" +
+ "LEFT JOIN " + testTable990 + " t3\n" +
+ "ON t1.ID = t3.ID AND t3.ID < 100";
+ String expected =
+ "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: a mix of join strategies chosen based on cost.
+ */
+ @Test
+ public void testJoinStrategy10() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+ "ON t1.ID = t2.COL1 AND t2.ID > 200\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.ID = t3.ID AND t3.ID < 100";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: hash-join two tables in parallel since two RHS tables are both small
+ * and can fit in memory at the same time.
+ */
+ @Test
+ public void testJoinStrategy11() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+ "ON t1.COL2 = t2.COL1 AND t2.ID > 200\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.COL1 = t3.COL2 AND t3.ID < 100";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " PARALLEL INNER-JOIN TABLE 1\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: similar to {@link this#testJoinStrategy11()}, but the two RHS
+ * tables cannot fit in memory at the same time, and thus a mix of join strategies
+ * is chosen based on cost.
+ */
+ @Test
+ public void testJoinStrategy12() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable990 + " t2\n" +
+ "ON t1.COL2 = t2.COL1\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.COL1 = t3.COL2";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER SORTED BY [T1.COL1]\n" +
+ " CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 991-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ " SERVER SORTED BY [T3.COL2]\n" +
+ " CLIENT MERGE SORT";
+ verifyQueryPlan(q, expected);
+ }
+
+ private static void verifyQueryPlan(String query, String expected) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + expected + "' in the plan:\n" + plan + ".",
+ plan.contains(expected));
+ }
+
+ private static String initTestTableValues(int rows) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "ID INTEGER NOT NULL PRIMARY KEY,\n" +
+ "COL1 INTEGER," +
+ "COL2 INTEGER)");
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + tableName + " VALUES(?, ?, ?)");
+ for (int i = 0; i < rows; i++) {
+ stmt.setInt(1, i + 1);
+ stmt.setInt(2, rows - i);
+ stmt.setInt(3, rows + i);
+ stmt.execute();
+ }
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ return tableName;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f3c4c24..f5a7e39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -71,6 +71,8 @@ import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.parse.TableWildcardParseNode;
import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -124,6 +126,8 @@ public class JoinCompiler {
private final ColumnResolver origResolver;
private final boolean useStarJoin;
private final Map<ColumnRef, ColumnRefType> columnRefs;
+ private final boolean useSortMergeJoin;
+ private final boolean costBased;
private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
@@ -132,6 +136,9 @@ public class JoinCompiler {
this.origResolver = resolver;
this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+ this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
+ this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
}
public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -365,6 +372,42 @@ public class JoinCompiler {
}
/**
+ * Return a list of all applicable join strategies. The order of the strategies in the
+ * returned list is based on the static rule below. However, the caller can decide on
+ * an optimal join strategy by evaluating and comparing the costs.
+ * 1. If hint USE_SORT_MERGE_JOIN is specified,
+ * return a singleton list containing only SORT_MERGE.
+ * 2. If 1) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B"; or
+ * 2) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B (LEFT/INNER/SEMI/ANTI JOIN C)+"
+ * and hint NO_STAR_JOIN is not specified,
+ * add BUILD_RIGHT to the returned list.
+ * 3. If matches pattern "A RIGHT/INNER JOIN B", where B is either a named table reference
+ * or a flat sub-query,
+ * add BUILD_LEFT to the returned list.
+ * 4. add SORT_MERGE to the returned list.
+ */
+ public List<Strategy> getApplicableJoinStrategies() {
+ List<Strategy> strategies = Lists.newArrayList();
+ if (useSortMergeJoin) {
+ strategies.add(Strategy.SORT_MERGE);
+ } else {
+ if (getStarJoinVector() != null) {
+ strategies.add(Strategy.HASH_BUILD_RIGHT);
+ }
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ if ((type == JoinType.Right || type == JoinType.Inner)
+ && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+ && lastJoinSpec.getJoinTable().getTable().isFlat()) {
+ strategies.add(Strategy.HASH_BUILD_LEFT);
+ }
+ strategies.add(Strategy.SORT_MERGE);
+ }
+
+ return strategies;
+ }
+
+ /**
* Returns a boolean vector indicating whether the evaluation of join expressions
* can be evaluated at an early stage if the input JoinSpec can be taken as a
* star join. Otherwise returns null.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 0688b94..e3ed110 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -251,7 +252,12 @@ public class ListJarsQueryPlan implements QueryPlan {
return false;
}
- @Override
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return Collections.<TableRef>emptySet();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3b14850..243f03e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -52,6 +53,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.EqualParseNode;
import org.apache.phoenix.parse.HintNode.Hint;
@@ -63,7 +65,10 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PDatum;
@@ -102,9 +107,9 @@ public class QueryCompiler {
private final ParallelIteratorFactory parallelIteratorFactory;
private final SequenceManager sequenceManager;
private final boolean projectTuples;
- private final boolean useSortMergeJoin;
private final boolean noChildParentJoinOptimization;
private final QueryPlan dataPlan;
+ private final boolean costBased;
public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
@@ -119,9 +124,10 @@ public class QueryCompiler {
this.parallelIteratorFactory = parallelIteratorFactory;
this.sequenceManager = sequenceManager;
this.projectTuples = projectTuples;
- this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
- if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+ ConnectionQueryServices services = statement.getConnection().getQueryServices();
+ this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+ if (services.getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
}
if (select.getHint().hasHint(Hint.NO_CACHE)) {
@@ -201,41 +207,17 @@ public class QueryCompiler {
}
}
- /*
+ /**
* Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
- * This matches the input JoinTable node against patterns in the following order:
- * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
- * Returns the compilation result of a single table scan or of an independent subquery.
- * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
- * 1) A LEFT/INNER JOIN B
- * 2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
- * where A can be a named table reference or a flat subquery, and B, C, ... can be a named
- * table reference, a sub-join or a subquery of any kind.
- * Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
- * 3. Matching pattern:
- * A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
- * where B can be a named table reference or a flat subquery, and A can be a named table
- * reference, a sub-join or a subquery of any kind.
- * Returns a HashJoinPlan{scan: B, hash: A}.
- * NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
- * "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
- * parenthesis is considered a sub-join.
- * viewed as a sub-join.
- * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
- * Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
- * of both sides as order-by clauses.
- * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
- *
- * If no join algorithm hint is provided, according to the above compilation process, a join query
- * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
- * TODO 1) Use table statistics to guide the choice of join plans.
- * 2) Make it possible to hint a certain join algorithm for a specific join step.
+ * If it is a leaf node, call compileSingleFlatQuery() or compileSubquery(), otherwise:
+ * 1) If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, return the
+ * join plan with the best cost. Note that the "best" plan is only locally optimal,
+ * and might or might not be globally optimal.
+ * 2) Otherwise, return the join plan compiled with the default strategy.
+ * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
*/
- @SuppressWarnings("unchecked")
protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
- byte[] emptyByteArray = new byte[0];
- List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
- if (joinSpecs.isEmpty()) {
+ if (joinTable.getJoinSpecs().isEmpty()) {
Table table = joinTable.getTable();
SelectStatement subquery = table.getAsSubquery(orderBy);
if (!table.isSubselect()) {
@@ -253,198 +235,229 @@ public class QueryCompiler {
return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
}
- boolean[] starJoinVector;
- if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
- Table table = joinTable.getTable();
- PTable initialProjectedTable;
- TableRef tableRef;
- SelectStatement query;
- TupleProjector tupleProjector;
- if (!table.isSubselect()) {
- context.setCurrentTable(table.getTableRef());
- initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
- tableRef = table.getTableRef();
- table.projectColumns(context.getScan());
- query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
- tupleProjector = new TupleProjector(initialProjectedTable);
- } else {
- SelectStatement subquery = table.getAsSubquery(orderBy);
- QueryPlan plan = compileSubquery(subquery, false);
- initialProjectedTable = table.createProjectedTable(plan.getProjector());
- tableRef = plan.getTableRef();
- context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
- query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
- tupleProjector = new TupleProjector(plan.getProjector());
+ List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies();
+ assert strategies.size() > 0;
+ if (!costBased || strategies.size() == 1) {
+ return compileJoinQuery(
+ strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ }
+
+ QueryPlan bestPlan = null;
+ Cost bestCost = null;
+ for (JoinCompiler.Strategy strategy : strategies) {
+ StatementContext newContext = new StatementContext(
+ context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
+ QueryPlan plan = compileJoinQuery(
+ strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ Cost cost = plan.getCost();
+ if (bestPlan == null || cost.compareTo(bestCost) < 0) {
+ bestPlan = plan;
+ bestCost = cost;
}
- context.setCurrentTable(tableRef);
- PTable projectedTable = initialProjectedTable;
- int count = joinSpecs.size();
- ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
- List<Expression>[] joinExpressions = new List[count];
- JoinType[] joinTypes = new JoinType[count];
- PTable[] tables = new PTable[count];
- int[] fieldPositions = new int[count];
- StatementContext[] subContexts = new StatementContext[count];
- QueryPlan[] subPlans = new QueryPlan[count];
- HashSubPlan[] hashPlans = new HashSubPlan[count];
- fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
- for (int i = 0; i < count; i++) {
- JoinSpec joinSpec = joinSpecs.get(i);
- Scan subScan = ScanUtil.newScan(originalScan);
- subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
- boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
- if (hasPostReference) {
- tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
- projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+ }
+ context.setResolver(bestPlan.getContext().getResolver());
+ context.setCurrentTable(bestPlan.getContext().getCurrentTable());
+ return bestPlan;
+ }
+
+ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+ byte[] emptyByteArray = new byte[0];
+ List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
+ switch (strategy) {
+ case HASH_BUILD_RIGHT: {
+ boolean[] starJoinVector = joinTable.getStarJoinVector();
+ Table table = joinTable.getTable();
+ PTable initialProjectedTable;
+ TableRef tableRef;
+ SelectStatement query;
+ TupleProjector tupleProjector;
+ if (!table.isSubselect()) {
+ context.setCurrentTable(table.getTableRef());
+ initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
+ tableRef = table.getTableRef();
+ table.projectColumns(context.getScan());
+ query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
+ tupleProjector = new TupleProjector(initialProjectedTable);
} else {
- tables[i] = null;
+ SelectStatement subquery = table.getAsSubquery(orderBy);
+ QueryPlan plan = compileSubquery(subquery, false);
+ initialProjectedTable = table.createProjectedTable(plan.getProjector());
+ tableRef = plan.getTableRef();
+ context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+ query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+ tupleProjector = new TupleProjector(plan.getProjector());
}
- }
- for (int i = 0; i < count; i++) {
- JoinSpec joinSpec = joinSpecs.get(i);
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
- joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
- Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
- joinExpressions[i] = joinConditions.getFirst();
- List<Expression> hashExpressions = joinConditions.getSecond();
- Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
- Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
- Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
- joinTypes[i] = joinSpec.getType();
- if (i < count - 1) {
- fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+ context.setCurrentTable(tableRef);
+ PTable projectedTable = initialProjectedTable;
+ int count = joinSpecs.size();
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+ List<Expression>[] joinExpressions = new List[count];
+ JoinType[] joinTypes = new JoinType[count];
+ PTable[] tables = new PTable[count];
+ int[] fieldPositions = new int[count];
+ StatementContext[] subContexts = new StatementContext[count];
+ QueryPlan[] subPlans = new QueryPlan[count];
+ HashSubPlan[] hashPlans = new HashSubPlan[count];
+ fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
+ for (int i = 0; i < count; i++) {
+ JoinSpec joinSpec = joinSpecs.get(i);
+ Scan subScan = ScanUtil.newScan(originalScan);
+ subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+ subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+ boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
+ if (hasPostReference) {
+ tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
+ projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+ } else {
+ tables[i] = null;
+ }
+ }
+ for (int i = 0; i < count; i++) {
+ JoinSpec joinSpec = joinSpecs.get(i);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
+ joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
+ Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], strategy);
+ joinExpressions[i] = joinConditions.getFirst();
+ List<Expression> hashExpressions = joinConditions.getSecond();
+ Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+ boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+ Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
+ Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
+ joinTypes[i] = joinSpec.getType();
+ if (i < count - 1) {
+ fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+ }
+ hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+ }
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
+ Integer limit = null;
+ Integer offset = null;
+ if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+ limit = plan.getLimit();
+ offset = plan.getOffset();
}
- hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
+ starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+ return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
}
- TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
- Integer limit = null;
- Integer offset = null;
- if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
- limit = plan.getLimit();
- offset = plan.getOffset();
+ case HASH_BUILD_LEFT: {
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
+ Table rhsTable = rhsJoinTable.getTable();
+ JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+ Scan subScan = ScanUtil.newScan(originalScan);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+ PTable rhsProjTable;
+ TableRef rhsTableRef;
+ SelectStatement rhs;
+ TupleProjector tupleProjector;
+ if (!rhsTable.isSubselect()) {
+ context.setCurrentTable(rhsTable.getTableRef());
+ rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
+ rhsTableRef = rhsTable.getTableRef();
+ rhsTable.projectColumns(context.getScan());
+ rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
+ tupleProjector = new TupleProjector(rhsProjTable);
+ } else {
+ SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
+ QueryPlan plan = compileSubquery(subquery, false);
+ rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
+ rhsTableRef = plan.getTableRef();
+ context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+ rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+ tupleProjector = new TupleProjector(plan.getProjector());
+ }
+ context.setCurrentTable(rhsTableRef);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[]{new ImmutableBytesPtr(emptyByteArray)};
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, strategy);
+ List<Expression> joinExpressions = joinConditions.getSecond();
+ List<Expression> hashExpressions = joinConditions.getFirst();
+ boolean needsMerge = lhsJoin.hasPostReference();
+ PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
+ int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
+ PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
+ QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
+ Integer limit = null;
+ Integer offset = null;
+ if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+ limit = rhsPlan.getLimit();
+ offset = rhsPlan.getOffset();
+ }
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
+ new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
+ new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+ Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+ getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
+ return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
}
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
- starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
- }
+ case SORT_MERGE: {
+ JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+ if (type == JoinType.Right) {
+ JoinTable temp = lhsJoin;
+ lhsJoin = rhsJoin;
+ rhsJoin = temp;
+ }
- JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
- JoinType type = lastJoinSpec.getType();
- if (!this.useSortMergeJoin
- && (type == JoinType.Right || type == JoinType.Inner)
- && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
- && lastJoinSpec.getJoinTable().getTable().isFlat()) {
- JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
- Table rhsTable = rhsJoinTable.getTable();
- JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
- Scan subScan = ScanUtil.newScan(originalScan);
- StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
- PTable rhsProjTable;
- TableRef rhsTableRef;
- SelectStatement rhs;
- TupleProjector tupleProjector;
- if (!rhsTable.isSubselect()) {
- context.setCurrentTable(rhsTable.getTableRef());
- rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
- rhsTableRef = rhsTable.getTableRef();
- rhsTable.projectColumns(context.getScan());
- rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
- tupleProjector = new TupleProjector(rhsProjTable);
- } else {
- SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
- QueryPlan plan = compileSubquery(subquery, false);
- rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
- rhsTableRef = plan.getTableRef();
- context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
- rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
- tupleProjector = new TupleProjector(plan.getProjector());
- }
- context.setCurrentTable(rhsTableRef);
- context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
- ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
- List<Expression> joinExpressions = joinConditions.getSecond();
- List<Expression> hashExpressions = joinConditions.getFirst();
- boolean needsMerge = lhsJoin.hasPostReference();
- PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
- int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
- PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
- TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
- QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
- Integer limit = null;
- Integer offset = null;
- if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
- limit = rhsPlan.getLimit();
- offset = rhsPlan.getOffset();
- }
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions },
- new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true },
- new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
- }
+ List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+ List<OrderByNode> lhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+ List<OrderByNode> rhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+ for (EqualParseNode condition : joinConditionNodes) {
+ lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+ rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+ }
- JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
- JoinTable rhsJoin = lastJoinSpec.getJoinTable();
- if (type == JoinType.Right) {
- JoinTable temp = lhsJoin;
- lhsJoin = rhsJoin;
- rhsJoin = temp;
- }
-
- List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
- List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
- List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
- for (EqualParseNode condition : joinConditionNodes) {
- lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
- rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
- }
-
- Scan lhsScan = ScanUtil.newScan(originalScan);
- StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
- boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
- PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
- boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
-
- Scan rhsScan = ScanUtil.newScan(originalScan);
- StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
- QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
- PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
-
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
- List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
- List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
-
- boolean needsMerge = rhsJoin.hasPostReference();
- int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
- PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
-
- ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
- TableRef tableRef = resolver.getTables().get(0);
- StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
- subCtx.setCurrentTable(tableRef);
- QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
- context.setCurrentTable(tableRef);
- context.setResolver(resolver);
- TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
- ParseNode where = joinTable.getPostFiltersCombined();
- SelectStatement select = asSubquery
- ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
- Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false,
- joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(),
+ Scan lhsScan = ScanUtil.newScan(originalScan);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+ boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+ PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
+ boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+
+ Scan rhsScan = ScanUtil.newScan(originalScan);
+ StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+ QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+ PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
+
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
+ List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+ List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+
+ boolean needsMerge = rhsJoin.hasPostReference();
+ int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
+ PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+ ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
+ TableRef tableRef = resolver.getTables().get(0);
+ StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+ subCtx.setCurrentTable(tableRef);
+ QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
+ context.setCurrentTable(tableRef);
+ context.setResolver(resolver);
+ TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+ ParseNode where = joinTable.getPostFiltersCombined();
+ SelectStatement select = asSubquery
+ ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
+ Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false,
+ joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(),
joinTable.getStatement().getUdfParseNodes())
- : NODE_FACTORY.select(joinTable.getStatement(), from, where);
-
- return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+ : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+
+ return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+ }
+ default:
+ throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
+ }
}
private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index ca88984..c2edaf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.optimize.Cost;
@@ -90,4 +91,5 @@ public interface QueryPlan extends StatementPlan {
*/
public boolean useRoundRobinIterator() throws SQLException;
+ <T> T accept(QueryPlanVisitor<T> visitor);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2714858..02aadc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -35,6 +35,7 @@ import org.apache.htrace.Sampler;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -270,6 +271,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return 0l;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2e042e7..0c8e8dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -33,6 +33,10 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.RowKeyExpression;
@@ -117,25 +121,39 @@ public class AggregatePlan extends BaseQueryPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
+ Double outputBytes = this.accept(new ByteCountVisitor());
+ Double rowWidth = this.accept(new AvgRowWidthVisitor());
+ Long inputRows = null;
try {
- byteCount = getEstimatedBytesToScan();
+ inputRows = getEstimatedRowsToScan();
} catch (SQLException e) {
// ignored.
}
-
- if (byteCount == null) {
+ if (inputRows == null || outputBytes == null || rowWidth == null) {
return Cost.UNKNOWN;
}
+ double inputBytes = inputRows * rowWidth;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(
+ RowCountVisitor.filter(
+ inputRows.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ context.getScan().getFilter())),
+ groupBy);
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
int parallelLevel = CostUtil.estimateParallelLevel(
true, context.getConnection().getQueryServices());
- Cost cost = CostUtil.estimateAggregateCost(byteCount,
- groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+ Cost cost = new Cost(0, 0, inputBytes);
+ Cost aggCost = CostUtil.estimateAggregateCost(
+ inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
+ cost = cost.plus(aggCost);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- double outputBytes = CostUtil.estimateAggregateOutputBytes(
- byteCount, groupBy, aggregators.getEstimatedByteSize());
- Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytesAfterHaving, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return cost;
@@ -304,4 +322,9 @@ public class AggregatePlan extends BaseQueryPlan {
return false;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
}