You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2018/03/14 04:37:32 UTC
[1/9] phoenix git commit: PHOENIX-4288 Indexes not used when ordering
by primary key
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 1e83415f1 -> 9bb7811f0
PHOENIX-4288 Indexes not used when ordering by primary key
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/541d6ac2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/541d6ac2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/541d6ac2
Branch: refs/heads/4.x-HBase-0.98
Commit: 541d6ac22866fe7571365e063a23108c6ca1ea63
Parents: 1e83415
Author: maryannxue <ma...@gmail.com>
Authored: Tue Dec 5 10:52:46 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:16:04 2018 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CostBasedDecisionIT.java | 466 +++++++++++++++++++
.../phoenix/compile/ListJarsQueryPlan.java | 6 +
.../org/apache/phoenix/compile/QueryPlan.java | 5 +-
.../apache/phoenix/compile/TraceQueryPlan.java | 6 +
.../apache/phoenix/execute/AggregatePlan.java | 30 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 19 +-
.../phoenix/execute/ClientAggregatePlan.java | 28 ++
.../apache/phoenix/execute/ClientScanPlan.java | 25 +
.../apache/phoenix/execute/CorrelatePlan.java | 25 +
.../phoenix/execute/DelegateQueryPlan.java | 6 +
.../apache/phoenix/execute/HashJoinPlan.java | 29 ++
.../execute/LiteralResultIterationPlan.java | 6 +
.../org/apache/phoenix/execute/ScanPlan.java | 25 +
.../phoenix/execute/SortMergeJoinPlan.java | 18 +
.../org/apache/phoenix/execute/UnionPlan.java | 10 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +
.../java/org/apache/phoenix/optimize/Cost.java | 123 +++++
.../apache/phoenix/optimize/QueryOptimizer.java | 28 +-
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../phoenix/query/QueryServicesOptions.java | 6 +-
.../java/org/apache/phoenix/util/CostUtil.java | 90 ++++
.../query/ParallelIteratorsSplitTest.java | 6 +
22 files changed, 951 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
new file mode 100644
index 0000000..a3584ce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+ props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering1() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
+ // Use the data table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+ plan.contains("FULL SCAN"));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the index table plan that has a lower cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+ plan.contains("RANGE SCAN"));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ // Use the index table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+ plan.contains("RANGE SCAN"));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Given that the range on C1 is meaningless and group-by becomes
+ // order-preserving if using the data table, the data table plan should
+ // come out as the best plan based on the costs.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+ plan.contains("FULL SCAN"));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering3() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "UPSERT SELECT\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "UPSERT SELECT\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "DELETE ROWS\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "DELETE ROWS\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInUnionQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
+ + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ // Use the default plan when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String defaultPlan =
+ "UNION ALL OVER 2 QUERIES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT MERGE SORT";
+ assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(defaultPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the optimal plan based on cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String optimizedPlan =
+ "UNION ALL OVER 2 QUERIES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
+ assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(optimizedPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInJoinQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
+ + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
+ + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+ // Use the default plan when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String defaultPlan =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X0%'\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
+ assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(defaultPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the optimal plan based on cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String optimizedPlan =
+ "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
+ " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
+ assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(optimizedPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testHintOverridesCost() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey INTEGER PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where rowkey between 1 and 10 ORDER BY c1";
+ String hintedQuery = query.replaceFirst("SELECT",
+ "SELECT /*+ INDEX(" + tableName + " " + tableName + "_idx) */");
+ String dataPlan = "SERVER SORTED BY [C1]";
+ String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
+
+ // Use the index table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setInt(1, i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the data table plan that has a lower cost when stats are available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+
+ // Use the index table plan as has been hinted.
+ rs = conn.createStatement().executeQuery("explain " + hintedQuery);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 1888114..fa48e52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
@@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public TableRef getTableRef() {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index f7cdcbf..ca88984 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan {
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
public long getEstimatedSize();
-
+
+ public Cost getCost();
+
// TODO: change once joins are supported
TableRef getTableRef();
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2eab965..5cd1d08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
@@ -200,6 +201,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return Collections.emptySet();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 37e0c5a..2e042e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
@@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan {
public Expression getHaving() {
return having;
}
-
+
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, aggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
@Override
public List<KeyRange> getSplits() {
if (splits == null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index df55e63..38ed926 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -504,13 +504,24 @@ public abstract class BaseQueryPlan implements QueryPlan {
if (context.getScanRanges() == ScanRanges.NOTHING) {
return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
}
-
+
+ // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
+ // get the stats for computing costs.
+ boolean costBased =
+ context.getConnection().getQueryServices().getConfiguration().getBoolean(
+ QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+ if (costBased) {
+ ResultIterator iterator = iterator();
+ iterator.close();
+ }
// Optimize here when getting explain plan, as queries don't get optimized until after compilation
QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
- this.estimatedRows = plan.getEstimatedRowsToScan();
- this.estimatedSize = plan.getEstimatedBytesToScan();
- this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ if (!costBased) { // do not override estimates if they are used for cost calculation.
+ this.estimatedRows = plan.getEstimatedRowsToScan();
+ this.estimatedSize = plan.getEstimatedBytesToScan();
+ this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ }
return exp;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 8ef1f8d..a15ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.TupleUtil;
import com.google.common.collect.Lists;
@@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, clientAggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 6bbc545..5799990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import com.google.common.collect.Lists;
@@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index ee81c36..270ad3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
@@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan {
return null;
}
+ @Override
+ public Cost getCost() {
+ Long lhsByteCount = null;
+ try {
+ lhsByteCount = delegate.getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+ Long rhsRowCount = null;
+ try {
+ rhsRowCount = rhs.getEstimatedRowsToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (lhsByteCount == null || rhsRowCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
+ Cost lhsCost = delegate.getCost();
+ return cost.plus(lhsCost).plus(rhs.getCost());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3c62c5b..3da06db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return delegate.getCost();
+ }
+
+ @Override
public TableRef getTableRef() {
return delegate.getTableRef();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2b90dcb..2d2ff4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
@@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan {
return statement;
}
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ Cost lhsCost = delegate.getCost();
+ if (keyRangeExpressions != null) {
+ // The selectivity of the dynamic rowkey filter.
+ // TODO replace the constant with an estimate value.
+ double selectivity = 0.01;
+ lhsCost = lhsCost.multiplyBy(selectivity);
+ }
+ Cost rhsCost = Cost.ZERO;
+ for (SubPlan subPlan : subPlans) {
+ rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ }
+ return cost.plus(lhsCost).plus(rhsCost);
+ }
+
protected interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 4470947..c9abb69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index af25bff..d63950c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -65,6 +66,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -193,6 +195,29 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
if (splits == null)
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+ }
+
+ @Override
public StatementContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.UnionResultIterators;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Cost cost = Cost.ZERO;
+ for (QueryPlan plan : plans) {
+ cost = cost.plus(plan.getCost());
+ }
+ return cost;
+ }
+
+ @Override
public ParameterMetaData getParameterMetaData() {
return paramMetaData;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f418bc9..94aeb34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.AliasedNode;
@@ -644,6 +645,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public TableRef getTableRef() {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+ /** The unknown cost. */
+ public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+ @Override
+ public String toString() {
+ return "{unknown}";
+ }
+ };
+
+ /** The zero cost. */
+ public static Cost ZERO = new Cost(0, 0, 0) {
+ @Override
+ public String toString() {
+ return "{zero}";
+ }
+ };
+
+ private final double cpu;
+ private final double memory;
+ private final double io;
+
+ public Cost(double cpu, double memory, double io) {
+ this.cpu = cpu;
+ this.memory = memory;
+ this.io = io;
+ }
+
+ public double getCpu() {
+ return cpu;
+ }
+
+ public double getMemory() {
+ return memory;
+ }
+
+ public double getIo() {
+ return io;
+ }
+
+ public boolean isUnknown() {
+ return this == UNKNOWN;
+ }
+
+ public Cost plus(Cost other) {
+ if (isUnknown() || other.isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu + other.cpu,
+ this.memory + other.memory,
+ this.io + other.io);
+ }
+
+ public Cost multiplyBy(double factor) {
+ if (isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu * factor,
+ this.memory * factor,
+ this.io * factor);
+ }
+
+ // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may
+ // add those into account as our cost model mature.
+ @Override
+ public int compareTo(Cost other) {
+ if (isUnknown() && other.isUnknown()) {
+ return 0;
+ } else if (isUnknown() && !other.isUnknown()) {
+ return 1;
+ } else if (!isUnknown() && other.isUnknown()) {
+ return -1;
+ }
+
+ double d = this.io - other.io;
+ return d == 0 ? 0 : (d > 0 ? 1 : -1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj
+ || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cpu, memory, io);
+ }
+
+ @Override
+ public String toString() {
+ return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 028fc94..8481bc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -69,11 +69,13 @@ public class QueryOptimizer {
private final QueryServices services;
private final boolean useIndexes;
+ private final boolean costBased;
private long indexPendingDisabledThreshold;
public QueryOptimizer(QueryServices services) {
this.services = services;
this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+ this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
this.indexPendingDisabledThreshold = this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
}
@@ -96,7 +98,7 @@ public class QueryOptimizer {
}
public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
- List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
+ List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
return plans.get(0);
}
@@ -329,7 +331,8 @@ public class QueryOptimizer {
/**
* Order the plans among all the possible ones from best to worst.
- * Since we don't keep stats yet, we use the following simple algorithm:
+ * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
+ * their costs, otherwise we use the following simple algorithm:
* 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately.
* 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
* in the same order as the row key columns.
@@ -337,9 +340,6 @@ public class QueryOptimizer {
* a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots).
* b) the plan that preserves ordering for a group by.
* c) the non local index table plan
- * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily
- * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges
- * separately, or we could calculate the bytes scanned between the start and stop row of each table.
* @param plans the list of candidate plans
* @return list of plans ordered from best to worst.
*/
@@ -348,7 +348,21 @@ public class QueryOptimizer {
if (plans.size() == 1) {
return plans;
}
-
+
+ if (this.costBased) {
+ Collections.sort(plans, new Comparator<QueryPlan>() {
+ @Override
+ public int compare(QueryPlan plan1, QueryPlan plan2) {
+ return plan1.getCost().compareTo(plan2.getCost());
+ }
+ });
+ // Return ordered list based on cost if stats are available; otherwise fall
+ // back to static ordering.
+ if (!plans.get(0).getCost().isUnknown()) {
+ return stopAtBestPlan ? plans.subList(0, 1) : plans;
+ }
+ }
+
/**
* If we have a plan(s) that are just point lookups (i.e. fully qualified row
* keys), then favor those first.
@@ -448,7 +462,7 @@ public class QueryOptimizer {
}
});
-
+
return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 56d6e06..9dd3074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -290,6 +290,8 @@ public interface QueryServices extends SQLCloseable {
//Update Cache Frequency default config attribute
public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB = "phoenix.default.update.cache.frequency";
+ // Whether to enable cost-based-decision in the query optimizer
+ public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled";
public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 33a319b..11d9784 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -329,6 +330,8 @@ public class QueryServicesOptions {
// RS -> RS calls for upsert select statements are disabled by default
public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
+ public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
@@ -403,7 +406,8 @@ public class QueryServicesOptions {
.setIfUnset(PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD, DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
.setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED)
- .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION);
+ .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
+ .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
// it to 1, so we'll change it.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
new file mode 100644
index 0000000..1d4b8e0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.optimize.Cost;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Utilities for computing costs.
+ *
+ * Some of the methods here should eventually be replaced by a metadata framework which
+ * estimates output metrics for each QueryPlan or operation, e.g. row count, byte count,
+ * etc.
+ */
+public class CostUtil {
+
+ // An estimate of the ratio of result data from group-by against the input data.
+ private final static double GROUPING_FACTOR = 0.1;
+
+ // Io operations conducted in intermediate evaluations like sorting or aggregation
+ // should be counted twice since they usually involve both read and write.
+ private final static double IO_COST_MULTIPLIER = 2.0;
+
+ /**
+ * Estimate the number of output bytes of an aggregate.
+ * @param byteCount the number of input bytes
+ * @param groupBy the compiled GroupBy object
+ * @param aggregatorsSize the byte size of aggregators
+ * @return the output byte count
+ */
+ public static double estimateAggregateOutputBytes(
+ double byteCount, GroupBy groupBy, int aggregatorsSize) {
+ if (groupBy.isUngroupedAggregate()) {
+ return aggregatorsSize;
+ }
+ return byteCount * GROUPING_FACTOR;
+ }
+
+ /**
+ * Estimate the cost of an aggregate.
+ * @param byteCount the number of input bytes
+ * @param groupBy the compiled GroupBy object
+ * @param aggregatorsSize the byte size of aggregators
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
+ */
+ public static Cost estimateAggregateCost(
+ double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
+ double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
+ double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
+ return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+ }
+
+ /**
+ * Estimate the cost of an order-by
+ * @param byteCount the number of input bytes
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
+ */
+ public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
+ return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+ }
+
+ /**
+ * Estimate the parallel level of an operation
+ * @param runningOnServer if the operation will be running on server side
+ * @param services the QueryServices object
+ * @return the parallel level
+ */
+ public static int estimateParallelLevel(boolean runningOnServer, QueryServices services) {
+ // TODO currently return constants for simplicity, should derive from cluster config.
+ return runningOnServer ? 10 : 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index cb34d2b..1903dda 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
@@ -486,6 +487,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public Long getEstimateInfoTimestamp() throws SQLException {
return null;
}
+
+ @Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null);
List<KeyRange> keyRanges = parallelIterators.getSplits();
[8/9] phoenix git commit: PHOENIX-4585 Prune local index regions used
for join queries
Posted by ma...@apache.org.
PHOENIX-4585 Prune local index regions used for join queries
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/babda325
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/babda325
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/babda325
Branch: refs/heads/4.x-HBase-0.98
Commit: babda3258921fdf4de595ba734d972860d58a0a4
Parents: 6914d54
Author: maryannxue <ma...@gmail.com>
Authored: Fri Feb 16 11:29:25 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:31:00 2018 -0700
----------------------------------------------------------------------
.../apache/phoenix/compile/JoinCompiler.java | 37 ++--
.../apache/phoenix/compile/QueryCompiler.java | 60 +++---
.../phoenix/compile/QueryCompilerTest.java | 186 ++++++++++++++++++-
3 files changed, 238 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f5a7e39..4020cf9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1199,7 +1199,8 @@ public class JoinCompiler {
return AndExpression.create(expressions);
}
- public static SelectStatement optimize(PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
+ public static Pair<SelectStatement, Map<TableRef, QueryPlan>> optimize(
+ PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
TableRef groupByTableRef = null;
TableRef orderByTableRef = null;
if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
@@ -1226,7 +1227,7 @@ public class JoinCompiler {
QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null);
List<Object> binds = statement.getParameters();
StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
- QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null);
+ QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null, Collections.<TableRef, QueryPlan>emptyMap());
TableRef table = plan.getTableRef();
if (groupByTableRef != null && !groupByTableRef.equals(table)) {
groupByTableRef = null;
@@ -1236,7 +1237,8 @@ public class JoinCompiler {
}
}
- final Map<TableRef, TableRef> replacement = new HashMap<TableRef, TableRef>();
+ Map<TableRef, TableRef> replacementMap = null;
+ Map<TableRef, QueryPlan> dataPlanMap = null;
for (Table table : join.getTables()) {
if (table.isSubselect())
@@ -1245,19 +1247,30 @@ public class JoinCompiler {
List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
- // TODO: As port of PHOENIX-4585, we need to make sure this plan has a pointer to the data plan
- // when an index is used instead of the data table, and that this method returns that
- // state for downstream processing.
// TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call
- QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
- if (!plan.getTableRef().equals(tableRef)) {
- replacement.put(tableRef, plan.getTableRef());
+ QueryPlan dataPlan =
+ new QueryCompiler(
+ statement, stmt,
+ FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
+ false, null)
+ .compile();
+ QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, dataPlan);
+ TableRef newTableRef = plan.getTableRef();
+ if (!newTableRef.equals(tableRef)) {
+ if (replacementMap == null) {
+ replacementMap = new HashMap<TableRef, TableRef>();
+ dataPlanMap = new HashMap<TableRef, QueryPlan>();
+ }
+ replacementMap.put(tableRef, newTableRef);
+ dataPlanMap.put(newTableRef, dataPlan);
}
}
- if (replacement.isEmpty())
- return select;
+ if (replacementMap == null)
+ return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(
+ select, Collections.<TableRef, QueryPlan> emptyMap());
+ final Map<TableRef, TableRef> replacement = replacementMap;
TableNode from = select.getFrom();
TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
private TableRef resolveTable(String alias, TableName name) throws SQLException {
@@ -1319,7 +1332,7 @@ public class JoinCompiler {
// replace expressions with corresponding matching columns for functional indexes
indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), statement.getConnection(), indexSelect.getUdfParseNodes()));
}
- return indexSelect;
+ return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(indexSelect, dataPlanMap);
}
private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index c8650b9..855b143 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -22,9 +22,9 @@ import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -184,7 +184,7 @@ public class QueryCompiler {
select.hasWildcard() ? null : select.getSelect());
ColumnResolver resolver = FromCompiler.getResolver(tableRef);
StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
- QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
+ QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false, null);
plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
context.getBindManager().getParameterMetaData());
@@ -195,15 +195,18 @@ public class QueryCompiler {
List<Object> binds = statement.getParameters();
StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
if (select.isJoin()) {
- select = JoinCompiler.optimize(statement, select, resolver);
- if (this.select != select) {
- ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+ Pair<SelectStatement, Map<TableRef, QueryPlan>> optimized =
+ JoinCompiler.optimize(statement, select, resolver);
+ SelectStatement optimizedSelect = optimized.getFirst();
+ if (select != optimizedSelect) {
+ ColumnResolver resolver = FromCompiler.getResolverForQuery(optimizedSelect, statement.getConnection());
context = new StatementContext(statement, resolver, scan, sequenceManager);
}
- JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
- return compileJoinQuery(context, binds, joinTable, false, false, null);
+ JoinTable joinTable = JoinCompiler.compile(statement, optimizedSelect, context.getResolver());
+ return compileJoinQuery(
+ context, binds, joinTable, false, false, null, optimized.getSecond());
} else {
- return compileSingleQuery(context, select, binds, false, true);
+ return compileSingleQuery(context, select, binds, false, true, dataPlan);
}
}
@@ -216,7 +219,7 @@ public class QueryCompiler {
* 2) Otherwise, return the join plan compiled with the default strategy.
* @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
*/
- protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+ protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
if (joinTable.getJoinSpecs().isEmpty()) {
Table table = joinTable.getTable();
SelectStatement subquery = table.getAsSubquery(orderBy);
@@ -227,7 +230,8 @@ public class QueryCompiler {
TupleProjector.serializeProjectorIntoScan(context.getScan(), projector);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
table.projectColumns(context.getScan());
- return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
+ QueryPlan dataPlan = dataPlans.get(table.getTableRef());
+ return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true, dataPlan);
}
QueryPlan plan = compileSubquery(subquery, false);
PTable projectedTable = table.createProjectedTable(plan.getProjector());
@@ -239,7 +243,7 @@ public class QueryCompiler {
assert strategies.size() > 0;
if (!costBased || strategies.size() == 1) {
return compileJoinQuery(
- strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
}
QueryPlan bestPlan = null;
@@ -248,7 +252,7 @@ public class QueryCompiler {
StatementContext newContext = new StatementContext(
context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
QueryPlan plan = compileJoinQuery(
- strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
Cost cost = plan.getCost();
if (bestPlan == null || cost.compareTo(bestCost) < 0) {
bestPlan = plan;
@@ -260,7 +264,7 @@ public class QueryCompiler {
return bestPlan;
}
- protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
byte[] emptyByteArray = new byte[0];
List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
switch (strategy) {
@@ -303,7 +307,7 @@ public class QueryCompiler {
JoinSpec joinSpec = joinSpecs.get(i);
Scan subScan = ScanUtil.newScan(originalScan);
subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+ subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null, dataPlans);
boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
if (hasPostReference) {
tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
@@ -330,7 +334,8 @@ public class QueryCompiler {
hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ QueryPlan dataPlan = dataPlans.get(tableRef);
+ QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
Integer offset = null;
@@ -350,7 +355,7 @@ public class QueryCompiler {
JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
Scan subScan = ScanUtil.newScan(originalScan);
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null, dataPlans);
PTable rhsProjTable;
TableRef rhsTableRef;
SelectStatement rhs;
@@ -383,7 +388,8 @@ public class QueryCompiler {
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
- QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ QueryPlan dataPlan = dataPlans.get(rhsTableRef);
+ QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
Integer offset = null;
@@ -420,13 +426,13 @@ public class QueryCompiler {
Scan lhsScan = ScanUtil.newScan(originalScan);
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy, dataPlans);
PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
Scan rhsScan = ScanUtil.newScan(originalScan);
StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
- QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+ QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy, dataPlans);
PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
@@ -453,7 +459,7 @@ public class QueryCompiler {
joinTable.getStatement().getUdfParseNodes())
: NODE_FACTORY.select(joinTable.getStatement(), from, where);
- return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder, null);
}
default:
throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -506,16 +512,16 @@ public class QueryCompiler {
}
int maxRows = this.statement.getMaxRows();
this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries.
- QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, dataPlan).compile();
+ QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, null).compile();
plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
this.statement.setMaxRows(maxRows); // restore maxRows.
return plan;
}
-
- protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
+
+ protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan dataPlan) throws SQLException{
SelectStatement innerSelect = select.getInnerSelectStatement();
if (innerSelect == null) {
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true, dataPlan);
}
QueryPlan innerPlan = compileSubquery(innerSelect, false);
@@ -530,10 +536,10 @@ public class QueryCompiler {
context.setCurrentTable(tableRef);
boolean isInRowKeyOrder = innerPlan.getGroupBy() == GroupBy.EMPTY_GROUP_BY && innerPlan.getOrderBy() == OrderBy.EMPTY_ORDER_BY;
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder, null);
}
-
- protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
+
+ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder, QueryPlan dataPlan) throws SQLException{
PTable projectedTable = null;
if (this.projectTuples) {
projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 05358d4..73cd69c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@@ -51,12 +52,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.execute.SortMergeJoinPlan;
-import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.*;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -88,6 +85,7 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -4458,4 +4456,180 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
}
+ @Test
+ public void testLocalIndexPruningInSortMergeJoin() throws SQLException {
+ verifyLocalIndexPruningWithMultipleTables("SELECT /*+ USE_SORT_MERGE_JOIN*/ *\n" +
+ "FROM T1 JOIN T2 ON T1.A = T2.A\n" +
+ "WHERE T1.A = 'B' and T1.C='C' and T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'");
+ }
+
+ @Ignore("Blocked by PHOENIX-4614")
+ @Test
+ public void testLocalIndexPruningInLeftOrInnerHashJoin() throws SQLException {
+ verifyLocalIndexPruningWithMultipleTables("SELECT *\n" +
+ "FROM T1 JOIN T2 ON T1.A = T2.A\n" +
+ "WHERE T1.A = 'B' and T1.C='C' and T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'");
+ }
+
+ @Ignore("Blocked by PHOENIX-4614")
+ @Test
+ public void testLocalIndexPruningInRightHashJoin() throws SQLException {
+ verifyLocalIndexPruningWithMultipleTables("SELECT *\n" +
+ "FROM (\n" +
+ " SELECT A, B, C, D FROM T2 WHERE T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'\n" +
+ ") T2\n" +
+ "RIGHT JOIN T1 ON T2.A = T1.A\n" +
+ "WHERE T1.A = 'B' and T1.C='C'");
+ }
+
+ @Test
+ public void testLocalIndexPruningInUinon() throws SQLException {
+ verifyLocalIndexPruningWithMultipleTables("SELECT A, B, C FROM T1\n" +
+ "WHERE A = 'B' and C='C'\n" +
+ "UNION ALL\n" +
+ "SELECT A, B, C FROM T2\n" +
+ "WHERE A IN ('A','G') and B = 'A' and D = 'D'");
+ }
+
+ private void verifyLocalIndexPruningWithMultipleTables(String query) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T1 (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX1 ON T1(A,C)");
+ conn.createStatement().execute("CREATE TABLE T2 (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " D CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX2 ON T2(A,B,D)");
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ List<QueryPlan> childPlans = plan.accept(new MultipleChildrenExtractor());
+ assertEquals(2, childPlans.size());
+ // Check left child
+ assertEquals("IDX1", childPlans.get(0).getContext().getCurrentTable().getTable().getName().getString());
+ childPlans.get(0).iterator();
+ List<List<Scan>> outerScansL = childPlans.get(0).getScans();
+ assertEquals(1, outerScansL.size());
+ List<Scan> innerScansL = outerScansL.get(0);
+ assertEquals(1, innerScansL.size());
+ Scan scanL = innerScansL.get(0);
+ assertEquals("A", Bytes.toString(scanL.getStartRow()).trim());
+ assertEquals("C", Bytes.toString(scanL.getStopRow()).trim());
+ // Check right child
+ assertEquals("IDX2", childPlans.get(1).getContext().getCurrentTable().getTable().getName().getString());
+ childPlans.get(1).iterator();
+ List<List<Scan>> outerScansR = childPlans.get(1).getScans();
+ assertEquals(2, outerScansR.size());
+ List<Scan> innerScansR1 = outerScansR.get(0);
+ assertEquals(1, innerScansR1.size());
+ Scan scanR1 = innerScansR1.get(0);
+ assertEquals("A", Bytes.toString(scanR1.getStartRow()).trim());
+ assertEquals("C", Bytes.toString(scanR1.getStopRow()).trim());
+ List<Scan> innerScansR2 = outerScansR.get(1);
+ assertEquals(1, innerScansR2.size());
+ Scan scanR2 = innerScansR2.get(0);
+ assertEquals("G", Bytes.toString(scanR2.getStartRow()).trim());
+ assertEquals("I", Bytes.toString(scanR2.getStopRow()).trim());
+ }
+ }
+
+ private static class MultipleChildrenExtractor implements QueryPlanVisitor<List<QueryPlan>> {
+
+ @Override
+ public List<QueryPlan> defaultReturn(QueryPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(AggregatePlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(ScanPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(ClientAggregatePlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public List<QueryPlan> visit(ClientScanPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public List<QueryPlan> visit(LiteralResultIterationPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(TupleProjectionPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public List<QueryPlan> visit(HashJoinPlan plan) {
+ List<QueryPlan> children = new ArrayList<QueryPlan>(plan.getSubPlans().length + 1);
+ children.add(plan.getDelegate());
+ for (HashJoinPlan.SubPlan subPlan : plan.getSubPlans()) {
+ children.add(subPlan.getInnerPlan());
+ }
+ return children;
+ }
+
+ @Override
+ public List<QueryPlan> visit(SortMergeJoinPlan plan) {
+ return Lists.newArrayList(plan.getLhsPlan(), plan.getRhsPlan());
+ }
+
+ @Override
+ public List<QueryPlan> visit(UnionPlan plan) {
+ return plan.getSubPlans();
+ }
+
+ @Override
+ public List<QueryPlan> visit(UnnestArrayPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(CorrelatePlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(CursorFetchPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(ListJarsQueryPlan plan) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<QueryPlan> visit(TraceQueryPlan plan) {
+ return Collections.emptyList();
+ }
+ }
}
[6/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge
join decision on cost
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index a15ab35..21cbc2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -35,6 +35,10 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.aggregator.Aggregators;
@@ -90,25 +94,30 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
-
- if (byteCount == null) {
+ Double outputBytes = this.accept(new ByteCountVisitor());
+ Double inputRows = this.getDelegate().accept(new RowCountVisitor());
+ Double rowWidth = this.accept(new AvgRowWidthVisitor());
+ if (inputRows == null || outputBytes == null || rowWidth == null) {
return Cost.UNKNOWN;
}
+ double inputBytes = inputRows * rowWidth;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(
+ RowCountVisitor.filter(
+ inputRows.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ context.getScan().getFilter())),
+ groupBy);
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
int parallelLevel = CostUtil.estimateParallelLevel(
false, context.getConnection().getQueryServices());
- Cost cost = CostUtil.estimateAggregateCost(byteCount,
- groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+ Cost cost = CostUtil.estimateAggregateCost(
+ inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- double outputBytes = CostUtil.estimateAggregateOutputBytes(
- byteCount, groupBy, clientAggregators.getEstimatedByteSize());
- Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytesAfterHaving, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return super.getCost().plus(cost);
@@ -210,7 +219,16 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
public GroupBy getGroupBy() {
return groupBy;
}
-
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ public Expression getHaving() {
+ return having;
+ }
+
private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
private final List<Expression> groupByExpressions;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index ac43919..75ba8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -85,4 +85,8 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
public FilterableStatement getStatement() {
return statement;
}
+
+ public Expression getWhere() {
+ return where;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 5799990..3427f5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -53,28 +55,30 @@ public class ClientScanPlan extends ClientProcessingPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double inputBytes = this.getDelegate().accept(new ByteCountVisitor());
+ Double outputBytes = this.accept(new ByteCountVisitor());
- if (byteCount == null) {
+ if (inputBytes == null || outputBytes == null) {
return Cost.UNKNOWN;
}
- Cost cost = new Cost(0, 0, byteCount);
int parallelLevel = CostUtil.estimateParallelLevel(
false, context.getConnection().getQueryServices());
+ Cost cost = new Cost(0, 0, 0);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ Cost orderByCost =
+ CostUtil.estimateOrderByCost(inputBytes, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return super.getCost().plus(cost);
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 270ad3d..e3e0264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.optimize.Cost;
@@ -202,19 +205,18 @@ public class CorrelatePlan extends DelegateQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ public QueryPlan getRhsPlan() {
+ return rhs;
+ }
+
+ @Override
public Cost getCost() {
- Long lhsByteCount = null;
- try {
- lhsByteCount = delegate.getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
- Long rhsRowCount = null;
- try {
- rhsRowCount = rhs.getEstimatedRowsToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double lhsByteCount = delegate.accept(new ByteCountVisitor());
+ Double rhsRowCount = rhs.accept(new RowCountVisitor());
if (lhsByteCount == null || rhsRowCount == null) {
return Cost.UNKNOWN;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
index cf0a3cf..0ecf74d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.CursorResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -51,6 +52,11 @@ public class CursorFetchPlan extends DelegateQueryPlan {
return resultIterator;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public ExplainPlan getExplainPlan() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 23a0da6..6ade42e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -48,6 +48,9 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
@@ -63,10 +66,7 @@ import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.ParseNode;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.*;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.types.PArrayDataType;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.SQLCloseables;
import com.google.common.collect.Lists;
@@ -92,6 +93,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
private final boolean recompileWhereClause;
private final Set<TableRef> tableRefs;
private final int maxServerCacheTimeToLive;
+ private final long serverCacheLimit;
private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
private HashCacheClient hashClient;
private AtomicLong firstJobEndTime;
@@ -132,8 +134,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
for (SubPlan subPlan : subPlans) {
tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
}
- this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
+ QueryServices services = plan.getContext().getConnection().getQueryServices();
+ this.maxServerCacheTimeToLive = services.getProps().getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+ this.serverCacheLimit = services.getProps().getLong(
+ QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
}
@Override
@@ -270,40 +275,101 @@ public class HashJoinPlan extends DelegateQueryPlan {
return statement;
}
+ public HashJoinInfo getJoinInfo() {
+ return joinInfo;
+ }
+
+ public SubPlan[] getSubPlans() {
+ return subPlans;
+ }
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public Cost getCost() {
- Long byteCount = null;
try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Long r = delegate.getEstimatedRowsToScan();
+ Double w = delegate.accept(new AvgRowWidthVisitor());
+ if (r == null || w == null) {
+ return Cost.UNKNOWN;
+ }
- if (byteCount == null) {
- return Cost.UNKNOWN;
- }
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, getContext().getConnection().getQueryServices());
+
+ double rowWidth = w;
+ double rows = RowCountVisitor.filter(
+ r.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ delegate.getContext().getScan().getFilter()));
+ double bytes = rowWidth * rows;
+ Cost cost = Cost.ZERO;
+ double rhsByteSum = 0.0;
+ for (int i = 0; i < subPlans.length; i++) {
+ double lhsBytes = bytes;
+ Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor());
+ Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor());
+ if (rhsRows == null || rhsWidth == null) {
+ return Cost.UNKNOWN;
+ }
+ double rhsBytes = rhsWidth * rhsRows;
+ rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]);
+ rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]);
+ bytes = rowWidth * rows;
+ cost = cost.plus(CostUtil.estimateHashJoinCost(
+ lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel));
+ rhsByteSum += rhsBytes;
+ }
- Cost cost = new Cost(0, 0, byteCount);
- Cost lhsCost = delegate.getCost();
- if (keyRangeExpressions != null) {
- // The selectivity of the dynamic rowkey filter.
- // TODO replace the constant with an estimate value.
- double selectivity = 0.01;
- lhsCost = lhsCost.multiplyBy(selectivity);
- }
- Cost rhsCost = Cost.ZERO;
- for (SubPlan subPlan : subPlans) {
- rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ if (rhsByteSum > serverCacheLimit) {
+ return Cost.UNKNOWN;
+ }
+
+ // Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan
+ if (delegate instanceof AggregatePlan) {
+ AggregatePlan aggPlan = (AggregatePlan) delegate;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy());
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving());
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
+ Cost aggCost = CostUtil.estimateAggregateCost(
+ bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel);
+ cost = cost.plus(aggCost);
+ rows = rowsAfterHaving;
+ bytes = bytesAfterHaving;
+ }
+ double outputRows = RowCountVisitor.limit(rows, delegate.getLimit());
+ double outputBytes = rowWidth * outputRows;
+ if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) {
+ int parallelLevel2 = CostUtil.estimateParallelLevel(
+ delegate instanceof ScanPlan, getContext().getConnection().getQueryServices());
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytes, outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+
+ // Calculate the cost of child nodes
+ Cost lhsCost = new Cost(0, 0, r.doubleValue() * w);
+ Cost rhsCost = Cost.ZERO;
+ for (SubPlan subPlan : subPlans) {
+ rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ }
+ return cost.plus(lhsCost).plus(rhsCost);
+ } catch (SQLException e) {
}
- return cost.plus(lhsCost).plus(rhsCost);
+ return Cost.UNKNOWN;
}
- protected interface SubPlan {
+ public interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
public QueryPlan getInnerPlan();
+ public boolean hasKeyRangeExpression();
}
public static class WhereClauseSubPlan implements SubPlan {
@@ -383,6 +449,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public QueryPlan getInnerPlan() {
return plan;
}
+
+ @Override
+ public boolean hasKeyRangeExpression() {
+ return false;
+ }
}
public static class HashSubPlan implements SubPlan {
@@ -495,6 +566,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public QueryPlan getInnerPlan() {
return plan;
}
+
+ @Override
+ public boolean hasKeyRangeExpression() {
+ return keyRangeLhsExpression != null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index c9abb69..255fca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -81,6 +82,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches)
throws SQLException {
ResultIterator scanner = new ResultIterator() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d63950c..ed145a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -37,6 +37,8 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.BaseResultIterators;
@@ -202,16 +204,17 @@ public class ScanPlan extends BaseQueryPlan {
} catch (SQLException e) {
// ignored.
}
+ Double outputBytes = this.accept(new ByteCountVisitor());
- if (byteCount == null) {
+ if (byteCount == null || outputBytes == null) {
return Cost.UNKNOWN;
}
- Cost cost = new Cost(0, 0, byteCount);
int parallelLevel = CostUtil.estimateParallelLevel(
true, context.getConnection().getQueryServices());
+ Cost cost = new Cost(0, 0, byteCount);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return cost;
@@ -320,6 +323,11 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() throws SQLException {
if (isSerial) {
return serialRowsEstimate;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 2436d1e..978c7b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -47,6 +47,8 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.MappedByteBufferQueue;
@@ -171,12 +173,7 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
- try {
- byteCount = getEstimatedBytesToScan();
- } catch (SQLException e) {
- // ignored.
- }
+ Double byteCount = this.accept(new ByteCountVisitor());
if (byteCount == null) {
return Cost.UNKNOWN;
@@ -255,7 +252,11 @@ public class SortMergeJoinPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return false;
}
-
+
+ public JoinType getJoinType() {
+ return type;
+ }
+
private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) {
SQLException e = null;
try {
@@ -717,6 +718,11 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return tableRefs;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index f42af56..f869a4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DelegateResultIterator;
import org.apache.phoenix.iterate.FilterResultIterator;
@@ -78,4 +79,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
return iterator;
}
+
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 3b5168c..6114d66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -112,6 +113,10 @@ public class UnionPlan implements QueryPlan {
return iterators.getScans();
}
+ public List<QueryPlan> getSubPlans() {
+ return plans;
+ }
+
@Override
public GroupBy getGroupBy() {
return groupBy;
@@ -230,7 +235,12 @@ public class UnionPlan implements QueryPlan {
return false;
}
- @Override
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Operation getOperation() {
return statement.getOperation();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 51cb67e..0bc3df4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.BaseSingleExpression;
import org.apache.phoenix.expression.BaseTerminalExpression;
import org.apache.phoenix.expression.Expression;
@@ -64,6 +65,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
return null;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
public class UnnestArrayResultIterator extends DelegateResultIterator {
private final UnnestArrayElemRefExpression elemRefExpression;
private final UnnestArrayElemIndexExpression elemIndexExpression;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
new file mode 100644
index 0000000..9525747
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the average number of bytes each
+ * row for a QueryPlan.
+ */
+public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ try {
+ Long byteCount = plan.getEstimatedBytesToScan();
+ Long rowCount = plan.getEstimatedRowsToScan();
+ if (byteCount != null && rowCount != null) {
+ if (byteCount == 0) {
+ return 0.0;
+ }
+ if (rowCount != 0) {
+ return ((double) byteCount) / rowCount;
+ }
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ try {
+ Long byteCount = plan.getEstimatedBytesToScan();
+ Long rowCount = plan.getEstimatedRowsToScan();
+ if (byteCount != null && rowCount != null) {
+ if (byteCount == 0) {
+ return 0.0;
+ }
+ if (rowCount != 0) {
+ return ((double) byteCount) / rowCount;
+ }
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ Double lhsWidth = plan.getDelegate().accept(this);
+ if (lhsWidth == null) {
+ return null;
+ }
+ JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+ HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+ Double width = lhsWidth;
+ for (int i = 0; i < joinTypes.length; i++) {
+ Double rhsWidth = subPlans[i].getInnerPlan().accept(this);
+ if (rhsWidth == null) {
+ return null;
+ }
+ width = join(width, rhsWidth, joinTypes[i]);
+ }
+
+ return width;
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ Double lhsWidth = plan.getLhsPlan().accept(this);
+ Double rhsWidth = plan.getRhsPlan().accept(this);
+ if (lhsWidth == null || rhsWidth == null) {
+ return null;
+ }
+
+ return join(lhsWidth, rhsWidth, plan.getJoinType());
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ Double sum = 0.0;
+ for (QueryPlan subPlan : plan.getSubPlans()) {
+ Double avgWidth = subPlan.accept(this);
+ if (avgWidth == null) {
+ return null;
+ }
+ sum += avgWidth;
+ }
+
+ return sum / plan.getSubPlans().size();
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return (double) plan.getEstimatedSize();
+ }
+
+
+ /*
+ * The below methods provide estimation of row width based on the input row width as well as
+ * the operator.
+ */
+
+ public static double join(double lhsWidth, double rhsWidth, JoinTableNode.JoinType type) {
+ double width;
+ switch (type) {
+ case Inner:
+ case Left:
+ case Right:
+ case Full: {
+ width = lhsWidth + rhsWidth;
+ break;
+ }
+ case Semi:
+ case Anti: {
+ width = lhsWidth;
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Invalid join type: " + type);
+ }
+ }
+ return width;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
new file mode 100644
index 0000000..61a2895
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output bytes for a QueryPlan.
+ */
+public class ByteCountVisitor implements QueryPlanVisitor<Double> {
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return getByteCountFromRowCountAndRowWidth(plan);
+ }
+
+ protected Double getByteCountFromRowCountAndRowWidth(QueryPlan plan) {
+ Double rowCount = plan.accept(new RowCountVisitor());
+ Double rowWidth = plan.accept(new AvgRowWidthVisitor());
+ if (rowCount == null || rowWidth == null) {
+ return null;
+ }
+
+ return rowCount * rowWidth;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
new file mode 100644
index 0000000..a7ae3af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.*;
+
+/**
+ *
+ * Visitor for a QueryPlan (which may contain other nested query-plans)
+ *
+ */
+public interface QueryPlanVisitor<E> {
+ E defaultReturn(QueryPlan plan);
+ E visit(AggregatePlan plan);
+ E visit(ScanPlan plan);
+ E visit(ClientAggregatePlan plan);
+ E visit(ClientScanPlan plan);
+ E visit(LiteralResultIterationPlan plan);
+ E visit(TupleProjectionPlan plan);
+ E visit(HashJoinPlan plan);
+ E visit(SortMergeJoinPlan plan);
+ E visit(UnionPlan plan);
+ E visit(UnnestArrayPlan plan);
+ E visit(CorrelatePlan plan);
+ E visit(CursorFetchPlan plan);
+ E visit(ListJarsQueryPlan plan);
+ E visit(TraceQueryPlan plan);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
new file mode 100644
index 0000000..58ceea9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.phoenix.compile.GroupByCompiler;
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output rows for a QueryPlan.
+ */
+public class RowCountVisitor implements QueryPlanVisitor<Double> {
+
+ // An estimate of the ratio of result data from group-by against the input data.
+ private final static double GROUPING_FACTOR = 0.1;
+
+ private final static double OUTER_JOIN_FACTOR = 1.15;
+ private final static double INNER_JOIN_FACTOR = 0.85;
+ private final static double SEMI_OR_ANTI_JOIN_FACTOR = 0.5;
+
+ private final static double UNION_DISTINCT_FACTOR = 0.8;
+
+ @Override
+ public Double defaultReturn(QueryPlan plan) {
+ return null;
+ }
+
+ @Override
+ public Double visit(AggregatePlan plan) {
+ try {
+ Long b = plan.getEstimatedRowsToScan();
+ if (b != null) {
+ return limit(
+ filter(
+ aggregate(
+ filter(
+ b.doubleValue(),
+ stripSkipScanFilter(
+ plan.getContext().getScan().getFilter())),
+ plan.getGroupBy()),
+ plan.getHaving()),
+ plan.getLimit());
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ScanPlan plan) {
+ try {
+ Long b = plan.getEstimatedRowsToScan();
+ if (b != null) {
+ return limit(
+ filter(
+ b.doubleValue(),
+ stripSkipScanFilter(plan.getContext().getScan().getFilter())),
+ plan.getLimit());
+ }
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientAggregatePlan plan) {
+ Double b = plan.getDelegate().accept(this);
+ if (b != null) {
+ return limit(
+ filter(
+ aggregate(
+ filter(b.doubleValue(), plan.getWhere()),
+ plan.getGroupBy()),
+ plan.getHaving()),
+ plan.getLimit());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(ClientScanPlan plan) {
+ if (plan.getLimit() != null) {
+ return (double) plan.getLimit();
+ }
+ Double b = plan.getDelegate().accept(this);
+ if (b != null) {
+ return limit(
+ filter(b.doubleValue(), plan.getWhere()),
+ plan.getLimit());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(LiteralResultIterationPlan plan) {
+ return 1.0;
+ }
+
+ @Override
+ public Double visit(TupleProjectionPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(HashJoinPlan plan) {
+ try {
+ QueryPlan lhsPlan = plan.getDelegate();
+ Long b = lhsPlan.getEstimatedRowsToScan();
+ if (b == null) {
+ return null;
+ }
+
+ Double rows = filter(b.doubleValue(),
+ stripSkipScanFilter(lhsPlan.getContext().getScan().getFilter()));
+ JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+ HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+ for (int i = 0; i < joinTypes.length; i++) {
+ Double rhsRows = subPlans[i].getInnerPlan().accept(this);
+ if (rhsRows == null) {
+ return null;
+ }
+ rows = join(rows, rhsRows.doubleValue(), joinTypes[i]);
+ }
+ if (lhsPlan instanceof AggregatePlan) {
+ AggregatePlan aggPlan = (AggregatePlan) lhsPlan;
+ rows = filter(aggregate(rows, aggPlan.getGroupBy()), aggPlan.getHaving());
+ }
+ return limit(rows, lhsPlan.getLimit());
+ } catch (SQLException e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(SortMergeJoinPlan plan) {
+ Double lhsRows = plan.getLhsPlan().accept(this);
+ Double rhsRows = plan.getRhsPlan().accept(this);
+ if (lhsRows != null && rhsRows != null) {
+ return join(lhsRows, rhsRows, plan.getJoinType());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(UnionPlan plan) {
+ int count = plan.getSubPlans().size();
+ double[] inputRows = new double[count];
+ for (int i = 0; i < count; i++) {
+ Double b = plan.getSubPlans().get(i).accept(this);
+ if (b != null) {
+ inputRows[i] = b.doubleValue();
+ } else {
+ return null;
+ }
+ }
+
+ return limit(union(true, inputRows),plan.getLimit());
+ }
+
+ @Override
+ public Double visit(UnnestArrayPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(CorrelatePlan plan) {
+ Double lhsRows = plan.getDelegate().accept(this);
+ if (lhsRows != null) {
+ return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+ }
+
+ return null;
+ }
+
+ @Override
+ public Double visit(CursorFetchPlan plan) {
+ return plan.getDelegate().accept(this);
+ }
+
+ @Override
+ public Double visit(ListJarsQueryPlan plan) {
+ return 0.0;
+ }
+
+ @Override
+ public Double visit(TraceQueryPlan plan) {
+ return 0.0;
+ }
+
+ public static Filter stripSkipScanFilter(Filter filter) {
+ if (filter == null) {
+ return null;
+ }
+ if (!(filter instanceof FilterList)) {
+ return filter instanceof BooleanExpressionFilter ? filter : null;
+ }
+ FilterList filterList = (FilterList) filter;
+ if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) {
+ return filter;
+ }
+ List<Filter> list = new ArrayList<>();
+ for (Filter f : filterList.getFilters()) {
+ Filter stripped = stripSkipScanFilter(f);
+ if (stripped != null) {
+ list.add(stripped);
+ }
+ }
+ return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list));
+ }
+
+
+ /*
+ * The below methods provide estimation of row count based on the input row count as well as
+ * the operator. They should be replaced by more accurate calculation based on histogram and
+ * a logical operator layer is expect to facilitate this.
+ */
+
+ public static double filter(double inputRows, Filter filter) {
+ if (filter == null) {
+ return inputRows;
+ }
+ return 0.5 * inputRows;
+ }
+
+ public static double filter(double inputRows, Expression filter) {
+ if (filter == null) {
+ return inputRows;
+ }
+ return 0.5 * inputRows;
+ }
+
+ public static double aggregate(double inputRows, GroupByCompiler.GroupBy groupBy) {
+ if (groupBy.isUngroupedAggregate()) {
+ return 1.0;
+ }
+ return GROUPING_FACTOR * inputRows;
+ }
+
+ public static double limit(double inputRows, Integer limit) {
+ if (limit == null) {
+ return inputRows;
+ }
+ return limit;
+ }
+
+ public static double join(double lhsRows, double[] rhsRows, JoinTableNode.JoinType[] types) {
+ assert rhsRows.length == types.length;
+ double rows = lhsRows;
+ for (int i = 0; i < rhsRows.length; i++) {
+ rows = join(rows, rhsRows[i], types[i]);
+ }
+ return rows;
+ }
+
+ public static double join(double lhsRows, double rhsRows, JoinTableNode.JoinType type) {
+ double rows;
+ switch (type) {
+ case Inner: {
+ rows = Math.min(lhsRows, rhsRows);
+ rows = rows * INNER_JOIN_FACTOR;
+ break;
+ }
+ case Left:
+ case Right:
+ case Full: {
+ rows = Math.max(lhsRows, rhsRows);
+ rows = rows * OUTER_JOIN_FACTOR;
+ break;
+ }
+ case Semi:
+ case Anti: {
+ rows = lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Invalid join type: " + type);
+ }
+ }
+ return rows;
+ }
+
+ public static double union(boolean all, double... inputRows) {
+ double rows = 0.0;
+ for (double d : inputRows) {
+ rows += d;
+ }
+ if (!all) {
+ rows *= UNION_DISTINCT_FACTOR;
+ }
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index da8beae..a55af6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
@@ -731,6 +732,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.defaultReturn(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return estimatedRows;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
index 1d4b8e0..db2b5ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -30,51 +30,52 @@ import org.apache.phoenix.query.QueryServices;
*/
public class CostUtil {
- // An estimate of the ratio of result data from group-by against the input data.
- private final static double GROUPING_FACTOR = 0.1;
-
- // Io operations conducted in intermediate evaluations like sorting or aggregation
- // should be counted twice since they usually involve both read and write.
- private final static double IO_COST_MULTIPLIER = 2.0;
-
/**
- * Estimate the number of output bytes of an aggregate.
- * @param byteCount the number of input bytes
+ * Estimate the cost of an aggregate.
+ * @param inputBytes the number of input bytes
+ * @param outputBytes the number of output bytes
* @param groupBy the compiled GroupBy object
- * @param aggregatorsSize the byte size of aggregators
- * @return the output byte count
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
*/
- public static double estimateAggregateOutputBytes(
- double byteCount, GroupBy groupBy, int aggregatorsSize) {
- if (groupBy.isUngroupedAggregate()) {
- return aggregatorsSize;
- }
- return byteCount * GROUPING_FACTOR;
+ public static Cost estimateAggregateCost(
+ double inputBytes, double outputBytes, GroupBy groupBy, int parallelLevel) {
+ double hashMapOverhead = groupBy.isOrderPreserving() || groupBy.isUngroupedAggregate() ? 1 : (outputBytes < 1 ? 1 : outputBytes);
+ return new Cost(0, 0, (outputBytes + hashMapOverhead * Math.log(inputBytes)) / parallelLevel);
}
/**
- * Estimate the cost of an aggregate.
- * @param byteCount the number of input bytes
- * @param groupBy the compiled GroupBy object
- * @param aggregatorsSize the byte size of aggregators
+ * Estimate the cost of an order-by
+ * @param inputBytes the number of input bytes
+ * @param outputBytes the number of output bytes, which may be different from inputBytes
+ * depending on whether there is a LIMIT
* @param parallelLevel number of parallel workers or threads
* @return the cost
*/
- public static Cost estimateAggregateCost(
- double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
- double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
- double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
- return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+ public static Cost estimateOrderByCost(double inputBytes, double outputBytes, int parallelLevel) {
+ if (inputBytes < 1) {
+ inputBytes = 1;
+ }
+ return new Cost(0, 0,
+ (outputBytes + outputBytes * Math.log(inputBytes)) / parallelLevel);
}
/**
- * Estimate the cost of an order-by
- * @param byteCount the number of input bytes
+ * Estimate the cost of a hash-join
+ * @param lhsBytes the number of left input bytes
+ * @param rhsBytes the number of right input bytes
+ * @param outputBytes the number of output bytes
* @param parallelLevel number of parallel workers or threads
* @return the cost
*/
- public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
- return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+ public static Cost estimateHashJoinCost(
+ double lhsBytes, double rhsBytes, double outputBytes,
+ boolean hasKeyRangeExpression, int parallelLevel) {
+ if (rhsBytes < 1) {
+ rhsBytes = 1;
+ }
+ return new Cost(0, 0,
+ (rhsBytes * Math.log(rhsBytes) + (hasKeyRangeExpression ? 0 : lhsBytes)) / parallelLevel + outputBytes);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1903dda..69aeaad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.iterate.ParallelIterators;
import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -474,6 +475,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.defaultReturn(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return null;
}
[9/9] phoenix git commit: PHOENIX-4611 Not nullable column impact on
join query plans
Posted by ma...@apache.org.
PHOENIX-4611 Not nullable column impact on join query plans
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9bb7811f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9bb7811f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9bb7811f
Branch: refs/heads/4.x-HBase-0.98
Commit: 9bb7811f001d00cea42da6185c3645d7d14e4a16
Parents: babda32
Author: maryannxue <ma...@gmail.com>
Authored: Fri Feb 16 21:03:46 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:31:58 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java | 8 ++++----
.../java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java | 4 ++--
.../main/java/org/apache/phoenix/compile/JoinCompiler.java | 4 ++++
3 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
index a132728..dea349a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -440,8 +440,8 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
" CLIENT MERGE SORT\n" +
" CLIENT SORTED BY [BUCKET, \"TIMESTAMP\"]\n" +
- "CLIENT SORTED BY [E.BUCKET, E.TIMESTAMP]\n" +
- "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]"
+ "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]"
:
"SORT-MERGE-JOIN (INNER) TABLES\n" +
" CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + eventCountTableName + " [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
@@ -456,8 +456,8 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
" SERVER DISTINCT PREFIX FILTER OVER [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
" CLIENT MERGE SORT\n" +
- "CLIENT SORTED BY [E.BUCKET, E.TIMESTAMP]\n" +
- "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]";
+ "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]";
ResultSet rs = conn.createStatement().executeQuery("explain " + q);
assertEquals(p, QueryUtil.getExplainPlan(rs));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index f09f1d3..3a1b015 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -786,7 +786,7 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
String p = i == 0 ?
"CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER EVENT_COUNT [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, \"E.TIMESTAMP\"]\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"E.TIMESTAMP\", E.BUCKET]\n" +
"CLIENT MERGE SORT\n" +
" PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
" CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
@@ -795,7 +795,7 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
:
"CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER EVENT_COUNT [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, \"E.TIMESTAMP\"]\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"E.TIMESTAMP\", E.BUCKET]\n" +
"CLIENT MERGE SORT\n" +
" PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
" CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [0,'5SEC',1462993420000000001,'Tr/Bal'] - [1,'5SEC',1462993520000000000,'Tr/Bal']\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 4020cf9..cf5a5dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -561,6 +561,10 @@ public class JoinCompiler {
}
compiled.add(new Pair<Expression, Expression>(left, right));
}
+ // TODO PHOENIX-4618:
+ // For Stategy.SORT_MERGE, we probably need to re-order the join keys based on the
+ // specific ordering required by the join's parent, or re-order the following way
+ // to align with group-by expressions' re-ordering.
if (strategy != Strategy.SORT_MERGE) {
Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
@Override
[4/9] phoenix git commit: PHOENIX-4437 Make
QueryPlan.getEstimatedBytesToScan() independent of getExplainPlan() and pull
optimize() out of getExplainPlan()
Posted by ma...@apache.org.
PHOENIX-4437 Make QueryPlan.getEstimatedBytesToScan() independent of getExplainPlan() and pull optimize() out of getExplainPlan()
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7ef96fe1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7ef96fe1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7ef96fe1
Branch: refs/heads/4.x-HBase-0.98
Commit: 7ef96fe1bed43f3ac3dae900a3e6a83791faf697
Parents: 977699a
Author: maryannxue <ma...@gmail.com>
Authored: Thu Dec 21 10:31:04 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 20:08:24 2018 -0700
----------------------------------------------------------------------
.../end2end/ExplainPlanWithStatsEnabledIT.java | 4 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 45 ++++++--------
.../apache/phoenix/execute/HashJoinPlan.java | 59 +++++++++---------
.../phoenix/execute/SortMergeJoinPlan.java | 63 ++++++++++----------
.../org/apache/phoenix/execute/UnionPlan.java | 53 ++++++++--------
.../apache/phoenix/jdbc/PhoenixStatement.java | 9 ++-
6 files changed, 120 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 76ed7ba..a835e84 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -303,8 +303,8 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(false);
Estimate info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 200L, info.estimatedBytes);
- assertEquals((Long) 2L, info.estimatedRows);
+ assertEquals((Long) 176l, info.estimatedBytes);
+ assertEquals((Long) 2l, info.estimatedRows);
assertTrue(info.estimateInfoTs > 0);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 38ed926..c4edf31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -118,7 +118,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
protected Long estimatedRows;
protected Long estimatedSize;
protected Long estimateInfoTimestamp;
- private boolean explainPlanCalled;
+ private boolean getEstimatesCalled;
protected BaseQueryPlan(
@@ -500,32 +500,17 @@ public abstract class BaseQueryPlan implements QueryPlan {
@Override
public ExplainPlan getExplainPlan() throws SQLException {
- explainPlanCalled = true;
if (context.getScanRanges() == ScanRanges.NOTHING) {
return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
}
- // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
- // get the stats for computing costs.
- boolean costBased =
- context.getConnection().getQueryServices().getConfiguration().getBoolean(
- QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
- if (costBased) {
- ResultIterator iterator = iterator();
- iterator.close();
- }
- // Optimize here when getting explain plan, as queries don't get optimized until after compilation
- QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
- ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
- if (!costBased) { // do not override estimates if they are used for cost calculation.
- this.estimatedRows = plan.getEstimatedRowsToScan();
- this.estimatedSize = plan.getEstimatedBytesToScan();
- this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
- }
- return exp;
+ ResultIterator iterator = iterator();
+ ExplainPlan explainPlan = new ExplainPlan(getPlanSteps(iterator));
+ iterator.close();
+ return explainPlan;
}
- private List<String> getPlanSteps(ResultIterator iterator){
+ private List<String> getPlanSteps(ResultIterator iterator) {
List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
iterator.explain(planSteps);
return planSteps;
@@ -538,26 +523,32 @@ public abstract class BaseQueryPlan implements QueryPlan {
@Override
public Long getEstimatedRowsToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedRows;
}
@Override
public Long getEstimatedBytesToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedSize;
}
@Override
public Long getEstimateInfoTimestamp() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimateInfoTimestamp;
}
+ private void getEstimates() throws SQLException {
+ getEstimatesCalled = true;
+ // Initialize a dummy iterator to get the estimates based on stats.
+ ResultIterator iterator = iterator();
+ iterator.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2d2ff4e..23a0da6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -99,7 +99,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
private Long estimatedRows;
private Long estimatedBytes;
private Long estimateInfoTs;
- private boolean explainPlanCalled;
+ private boolean getEstimatesCalled;
public static HashJoinPlan create(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
@@ -247,7 +247,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
@Override
public ExplainPlan getExplainPlan() throws SQLException {
- explainPlanCalled = true;
List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
int count = subPlans.length;
for (int i = 0; i < count; i++) {
@@ -263,26 +262,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
if (joinInfo != null && joinInfo.getLimit() != null) {
planSteps.add(" JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
}
- for (SubPlan subPlan : subPlans) {
- if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
- || subPlan.getInnerPlan().getEstimatedRowsToScan() == null
- || subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
- /*
- * If any of the sub plans doesn't have the estimate info available, then we don't
- * provide estimate for the overall plan
- */
- estimatedBytes = null;
- estimatedRows = null;
- estimateInfoTs = null;
- break;
- } else {
- estimatedBytes =
- add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
- estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
- estimateInfoTs =
- getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
- }
- }
return new ExplainPlan(planSteps);
}
@@ -520,27 +499,51 @@ public class HashJoinPlan extends DelegateQueryPlan {
@Override
public Long getEstimatedRowsToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedRows;
}
@Override
public Long getEstimatedBytesToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedBytes;
}
@Override
public Long getEstimateInfoTimestamp() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimateInfoTs;
}
+
+ private void getEstimates() throws SQLException {
+ getEstimatesCalled = true;
+ for (SubPlan subPlan : subPlans) {
+ if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
+ || subPlan.getInnerPlan().getEstimatedRowsToScan() == null
+ || subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ break;
+ } else {
+ estimatedBytes =
+ add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
+ estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
+ estimateInfoTs =
+ getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 3e380da..2436d1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -95,7 +95,7 @@ public class SortMergeJoinPlan implements QueryPlan {
private Long estimatedBytes;
private Long estimatedRows;
private Long estimateInfoTs;
- private boolean explainPlanCalled;
+ private boolean getEstimatesCalled;
public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table,
JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
@@ -157,7 +157,6 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
public ExplainPlan getExplainPlan() throws SQLException {
- explainPlanCalled = true;
List<String> steps = Lists.newArrayList();
steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES");
for (String step : lhsPlan.getExplainPlan().getPlanSteps()) {
@@ -167,28 +166,6 @@ public class SortMergeJoinPlan implements QueryPlan {
for (String step : rhsPlan.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
- if ((lhsPlan.getEstimatedBytesToScan() == null || rhsPlan.getEstimatedBytesToScan() == null)
- || (lhsPlan.getEstimatedRowsToScan() == null
- || rhsPlan.getEstimatedRowsToScan() == null)
- || (lhsPlan.getEstimateInfoTimestamp() == null
- || rhsPlan.getEstimateInfoTimestamp() == null)) {
- /*
- * If any of the sub plans doesn't have the estimate info available, then we don't
- * provide estimate for the overall plan
- */
- estimatedBytes = null;
- estimatedRows = null;
- estimateInfoTs = null;
- } else {
- estimatedBytes =
- add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()),
- rhsPlan.getEstimatedBytesToScan());
- estimatedRows =
- add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()),
- rhsPlan.getEstimatedRowsToScan());
- estimateInfoTs =
- getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
- }
return new ExplainPlan(steps);
}
@@ -754,25 +731,51 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
public Long getEstimatedRowsToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedRows;
}
@Override
public Long getEstimatedBytesToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedBytes;
}
@Override
public Long getEstimateInfoTimestamp() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimateInfoTs;
}
+
+ private void getEstimates() throws SQLException {
+ getEstimatesCalled = true;
+ if ((lhsPlan.getEstimatedBytesToScan() == null || rhsPlan.getEstimatedBytesToScan() == null)
+ || (lhsPlan.getEstimatedRowsToScan() == null
+ || rhsPlan.getEstimatedRowsToScan() == null)
+ || (lhsPlan.getEstimateInfoTimestamp() == null
+ || rhsPlan.getEstimateInfoTimestamp() == null)) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ } else {
+ estimatedBytes =
+ add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()),
+ rhsPlan.getEstimatedBytesToScan());
+ estimatedRows =
+ add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()),
+ rhsPlan.getEstimatedRowsToScan());
+ estimateInfoTs =
+ getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e6bf654..3b5168c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -69,7 +69,7 @@ public class UnionPlan implements QueryPlan {
private Long estimatedRows;
private Long estimatedBytes;
private Long estimateInfoTs;
- private boolean explainPlanCalled;
+ private boolean getEstimatesCalled;
public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException {
@@ -174,7 +174,6 @@ public class UnionPlan implements QueryPlan {
@Override
public ExplainPlan getExplainPlan() throws SQLException {
- explainPlanCalled = true;
List<String> steps = new ArrayList<String>();
steps.add("UNION ALL OVER " + this.plans.size() + " QUERIES");
ResultIterator iterator = iterator();
@@ -184,23 +183,6 @@ public class UnionPlan implements QueryPlan {
for (int i = 1 ; i < steps.size()-offset; i++) {
steps.set(i, " " + steps.get(i));
}
- for (QueryPlan plan : plans) {
- if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null
- || plan.getEstimateInfoTimestamp() == null) {
- /*
- * If any of the sub plans doesn't have the estimate info available, then we don't
- * provide estimate for the overall plan
- */
- estimatedBytes = null;
- estimatedRows = null;
- estimateInfoTs = null;
- break;
- } else {
- estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
- estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
- estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp());
- }
- }
return new ExplainPlan(steps);
}
@@ -265,25 +247,46 @@ public class UnionPlan implements QueryPlan {
@Override
public Long getEstimatedRowsToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedRows;
}
@Override
public Long getEstimatedBytesToScan() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimatedBytes;
}
@Override
public Long getEstimateInfoTimestamp() throws SQLException {
- if (!explainPlanCalled) {
- getExplainPlan();
+ if (!getEstimatesCalled) {
+ getEstimates();
}
return estimateInfoTs;
}
+
+ private void getEstimates() throws SQLException {
+ getEstimatesCalled = true;
+ for (QueryPlan plan : plans) {
+ if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null
+ || plan.getEstimateInfoTimestamp() == null) {
+ /*
+ * If any of the sub plans doesn't have the estimate info available, then we don't
+ * provide estimate for the overall plan
+ */
+ estimatedBytes = null;
+ estimatedRows = null;
+ estimateInfoTs = null;
+ break;
+ } else {
+ estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
+ estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
+ estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 94aeb34..da8beae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -578,7 +578,14 @@ public class PhoenixStatement implements Statement, SQLCloseable {
@Override
public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
CompilableStatement compilableStmt = getStatement();
- final StatementPlan plan = compilableStmt.compilePlan(stmt, Sequence.ValueOp.VALIDATE_SEQUENCE);
+ StatementPlan compilePlan = compilableStmt.compilePlan(stmt, Sequence.ValueOp.VALIDATE_SEQUENCE);
+ // For a QueryPlan, we need to get its optimized plan; for a MutationPlan, its enclosed QueryPlan
+ // has already been optimized during compilation.
+ if (compilePlan instanceof QueryPlan) {
+ QueryPlan dataPlan = (QueryPlan) compilePlan;
+ compilePlan = stmt.getConnection().getQueryServices().getOptimizer().optimize(stmt, dataPlan);
+ }
+ final StatementPlan plan = compilePlan;
List<String> planSteps = plan.getExplainPlan().getPlanSteps();
List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
Long estimatedBytesToScan = plan.getEstimatedBytesToScan();
[3/9] phoenix git commit: PHOENIX-3050 Handle DESC columns in
child/parent join optimization
Posted by ma...@apache.org.
PHOENIX-3050 Handle DESC columns in child/parent join optimization
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/977699af
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/977699af
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/977699af
Branch: refs/heads/4.x-HBase-0.98
Commit: 977699afe0d66f1434b8bc1c5a751767e563d6ce
Parents: 92b57c7
Author: maryannxue <ma...@gmail.com>
Authored: Wed Dec 6 12:07:16 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:17:16 2018 -0700
----------------------------------------------------------------------
.../phoenix/end2end/join/HashJoinMoreIT.java | 5 +++++
.../org/apache/phoenix/compile/JoinCompiler.java | 19 +++++++++++++------
.../apache/phoenix/compile/QueryCompiler.java | 6 +++---
.../apache/phoenix/compile/WhereOptimizer.java | 5 -----
4 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index 37ffd02..f09f1d3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -895,6 +895,11 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
+ "FROM ( SELECT ACCOUNT_ID, BUCKET_ID, OBJECT_ID, MAX(OBJECT_VERSION) AS MAXVER "
+ " FROM test2961 GROUP BY ACCOUNT_ID, BUCKET_ID, OBJECT_ID) AS X "
+ " INNER JOIN test2961 AS OBJ ON X.ACCOUNT_ID = OBJ.ACCOUNT_ID AND X.BUCKET_ID = OBJ.BUCKET_ID AND X.OBJECT_ID = OBJ.OBJECT_ID AND X.MAXVER = OBJ.OBJECT_VERSION";
+ rs = conn.createStatement().executeQuery("explain " + q);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String dynamicFilter = "DYNAMIC SERVER FILTER BY (OBJ.ACCOUNT_ID, OBJ.BUCKET_ID, OBJ.OBJECT_ID, OBJ.OBJECT_VERSION) IN ((X.ACCOUNT_ID, X.BUCKET_ID, X.OBJECT_ID, X.MAXVER))";
+ assertTrue("Expected '" + dynamicFilter + "' to be used for the query, but got:\n" + plan,
+ plan.contains(dynamicFilter));
rs = conn.createStatement().executeQuery(q);
assertTrue(rs.next());
assertEquals("2222", rs.getString(4));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f9d8711..f3c4c24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -108,6 +108,12 @@ import com.google.common.collect.Sets;
public class JoinCompiler {
+ public enum Strategy {
+ HASH_BUILD_LEFT,
+ HASH_BUILD_RIGHT,
+ SORT_MERGE,
+ }
+
public enum ColumnRefType {
JOINLOCAL,
GENERAL,
@@ -487,7 +493,7 @@ public class JoinCompiler {
return dependencies;
}
- public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException {
+ public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, Strategy strategy) throws SQLException {
if (onConditions.isEmpty()) {
return new Pair<List<Expression>, List<Expression>>(
Collections.<Expression> singletonList(LiteralExpression.newConstant(1)),
@@ -503,15 +509,16 @@ public class JoinCompiler {
rhsCompiler.reset();
Expression right = condition.getRHS().accept(rhsCompiler);
PDataType toType = getCommonType(left.getDataType(), right.getDataType());
- if (left.getDataType() != toType || left.getSortOrder() == SortOrder.DESC) {
- left = CoerceExpression.create(left, toType, SortOrder.ASC, left.getMaxLength());
+ SortOrder toSortOrder = strategy == Strategy.SORT_MERGE ? SortOrder.ASC : (strategy == Strategy.HASH_BUILD_LEFT ? right.getSortOrder() : left.getSortOrder());
+ if (left.getDataType() != toType || left.getSortOrder() != toSortOrder) {
+ left = CoerceExpression.create(left, toType, toSortOrder, left.getMaxLength());
}
- if (right.getDataType() != toType || right.getSortOrder() == SortOrder.DESC) {
- right = CoerceExpression.create(right, toType, SortOrder.ASC, right.getMaxLength());
+ if (right.getDataType() != toType || right.getSortOrder() != toSortOrder) {
+ right = CoerceExpression.create(right, toType, toSortOrder, right.getMaxLength());
}
compiled.add(new Pair<Expression, Expression>(left, right));
}
- if (sortExpressions) {
+ if (strategy != Strategy.SORT_MERGE) {
Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
@Override
public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9443110..c5bc44c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -305,7 +305,7 @@ public class QueryCompiler {
JoinSpec joinSpec = joinSpecs.get(i);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
- Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true);
+ Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
joinExpressions[i] = joinConditions.getFirst();
List<Expression> hashExpressions = joinConditions.getSecond();
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -367,7 +367,7 @@ public class QueryCompiler {
context.setCurrentTable(rhsTableRef);
context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
List<Expression> joinExpressions = joinConditions.getSecond();
List<Expression> hashExpressions = joinConditions.getFirst();
boolean needsMerge = lhsJoin.hasPostReference();
@@ -420,7 +420,7 @@ public class QueryCompiler {
QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false);
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index ccf073a..87f00e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -326,11 +326,6 @@ public class WhereOptimizer {
PTable table = context.getCurrentTable().getTable();
for (int i = 0; i < expressions.size(); i++) {
Expression expression = expressions.get(i);
- // TODO this is a temporary fix for PHOENIX-3029.
- if (expression instanceof CoerceExpression
- && expression.getSortOrder() != expression.getChildren().get(0).getSortOrder()) {
- continue;
- }
KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, table);
KeyExpressionVisitor.KeySlots keySlots = expression.accept(visitor);
int minPkPos = Integer.MAX_VALUE;
[2/9] phoenix git commit: PHOENIX-4322 DESC primary key column with
variable length does not work in SkipScanFilter
Posted by ma...@apache.org.
PHOENIX-4322 DESC primary key column with variable length does not work in SkipScanFilter
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92b57c78
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92b57c78
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92b57c78
Branch: refs/heads/4.x-HBase-0.98
Commit: 92b57c7893c91d90d78e30171e233043dbcb4583
Parents: 541d6ac
Author: maryannxue <ma...@gmail.com>
Authored: Tue Dec 5 10:59:41 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:16:42 2018 -0700
----------------------------------------------------------------------
.../it/java/org/apache/phoenix/end2end/SortOrderIT.java | 11 ++++++++++-
.../expression/RowValueConstructorExpression.java | 4 ++--
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b57c78/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
index 655dbb1..3f749c1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
@@ -167,7 +167,16 @@ public class SortOrderIT extends ParallelStatsDisabledIT {
runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", 2}}, new WhereCondition("oid", "IN", "('o2')"),
table);
}
-
+
+ @Test
+ public void inDescCompositePK3() throws Exception {
+ String table = generateUniqueName();
+ String ddl = "CREATE table " + table + " (oid VARCHAR NOT NULL, code VARCHAR NOT NULL constraint pk primary key (oid DESC, code DESC))";
+ Object[][] insertedRows = new Object[][]{{"o1", "1"}, {"o2", "2"}, {"o3", "3"}};
+ runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", "2"}, {"o1", "1"}}, new WhereCondition("(oid, code)", "IN", "(('o2', '2'), ('o1', '1'))"),
+ table);
+ }
+
@Test
public void likeDescCompositePK1() throws Exception {
String table = generateUniqueName();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b57c78/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
index 15f6e3e..9bb7234 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
@@ -199,8 +199,8 @@ public class RowValueConstructorExpression extends BaseCompoundExpression {
// as otherwise we need it to ensure sort order is correct
for (int k = expressionCount -1 ;
k >=0 && getChildren().get(k).getDataType() != null
- && !getChildren().get(k).getDataType().isFixedWidth()
- && outputBytes[outputSize-1] == QueryConstants.SEPARATOR_BYTE ; k--) {
+ && !getChildren().get(k).getDataType().isFixedWidth()
+ && outputBytes[outputSize-1] == SchemaUtil.getSeparatorByte(true, false, getChildren().get(k)) ; k--) {
outputSize--;
}
ptr.set(outputBytes, 0, outputSize);
[7/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge
join decision on cost
Posted by ma...@apache.org.
PHOENIX-1556 Base hash versus sort merge join decision on cost
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6914d54d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6914d54d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6914d54d
Branch: refs/heads/4.x-HBase-0.98
Commit: 6914d54d99b4fafae44d1a3397c44ba6e5d10368
Parents: 2c75823
Author: maryannxue <ma...@gmail.com>
Authored: Mon Feb 12 14:07:30 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:19:01 2018 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CostBasedDecisionIT.java | 420 ++++++++++++-----
.../apache/phoenix/compile/JoinCompiler.java | 43 ++
.../phoenix/compile/ListJarsQueryPlan.java | 8 +-
.../apache/phoenix/compile/QueryCompiler.java | 455 ++++++++++---------
.../org/apache/phoenix/compile/QueryPlan.java | 2 +
.../apache/phoenix/compile/TraceQueryPlan.java | 6 +
.../apache/phoenix/execute/AggregatePlan.java | 41 +-
.../phoenix/execute/ClientAggregatePlan.java | 46 +-
.../phoenix/execute/ClientProcessingPlan.java | 4 +
.../apache/phoenix/execute/ClientScanPlan.java | 22 +-
.../apache/phoenix/execute/CorrelatePlan.java | 26 +-
.../apache/phoenix/execute/CursorFetchPlan.java | 6 +
.../apache/phoenix/execute/HashJoinPlan.java | 128 ++++--
.../execute/LiteralResultIterationPlan.java | 6 +
.../org/apache/phoenix/execute/ScanPlan.java | 14 +-
.../phoenix/execute/SortMergeJoinPlan.java | 20 +-
.../phoenix/execute/TupleProjectionPlan.java | 6 +
.../org/apache/phoenix/execute/UnionPlan.java | 12 +-
.../apache/phoenix/execute/UnnestArrayPlan.java | 6 +
.../execute/visitor/AvgRowWidthVisitor.java | 205 +++++++++
.../execute/visitor/ByteCountVisitor.java | 125 +++++
.../execute/visitor/QueryPlanVisitor.java | 46 ++
.../execute/visitor/RowCountVisitor.java | 335 ++++++++++++++
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +
.../java/org/apache/phoenix/util/CostUtil.java | 61 +--
.../query/ParallelIteratorsSplitTest.java | 6 +
26 files changed, 1615 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
index a3584ce..493855a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -32,12 +32,16 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Maps;
public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+ private final String testTable500;
+ private final String testTable990;
+ private final String testTable1000;
@BeforeClass
public static void doSetup() throws Exception {
@@ -46,9 +50,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+ props.put(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, Long.toString(150000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
+ public CostBasedDecisionIT() throws Exception {
+ testTable500 = initTestTableValues(500);
+ testTable990 = initTestTableValues(990);
+ testTable1000 = initTestTableValues(1000);
+ }
+
@Test
public void testCostOverridesStaticPlanOrdering1() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -64,10 +75,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
// Use the data table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
- plan.contains("FULL SCAN"));
+ verifyQueryPlan(query, "FULL SCAN");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -81,10 +89,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the index table plan that has a lower cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
- plan.contains("RANGE SCAN"));
+ verifyQueryPlan(query, "RANGE SCAN");
} finally {
conn.close();
}
@@ -103,12 +108,12 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1";
// Use the index table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
- plan.contains("RANGE SCAN"));
+ verifyQueryPlan(query,
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -124,10 +129,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
// Given that the range on C1 is meaningless and group-by becomes
// order-preserving if using the data table, the data table plan should
// come out as the best plan based on the costs.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
- plan.contains("FULL SCAN"));
+ verifyQueryPlan(query,
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -150,14 +156,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -171,14 +173,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -201,15 +199,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"UPSERT SELECT\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -223,15 +217,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"UPSERT SELECT\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -254,15 +244,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
// Use the idx2 plan with a wider PK slot span when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String indexPlan =
+ verifyQueryPlan(query,
"DELETE ROWS\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
" SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ "CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -276,15 +262,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the idx2 plan that scans less data when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String dataPlan =
+ verifyQueryPlan(query,
"DELETE ROWS\n" +
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
" SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
- "CLIENT MERGE SORT";
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ "CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -303,22 +285,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
- + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1 "
+ + "UNION ALL SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey >= 'a' GROUP BY c1";
// Use the default plan when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String defaultPlan =
+ verifyQueryPlan(query,
"UNION ALL OVER 2 QUERIES\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
" SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
- " SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
- " CLIENT MERGE SORT";
- assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
- plan.contains(defaultPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['a'] - [*]\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ " CLIENT MERGE SORT");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -332,19 +309,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the optimal plan based on cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String optimizedPlan =
+ verifyQueryPlan(query,
"UNION ALL OVER 2 QUERIES\n" +
" CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
- " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
" CLIENT MERGE SORT\n" +
- " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
- " SERVER FILTER BY C1 LIKE 'X%'\n" +
- " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
- assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
- plan.contains(optimizedPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" >= 'a'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT");
} finally {
conn.close();
}
@@ -363,23 +337,18 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
"c2 VARCHAR)");
conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
- String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
- + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
- + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+ String query = "SELECT t1.rowkey, t1.c1, t1.c2, t2.c1, mc2 FROM " + tableName + " t1 "
+ + "JOIN (SELECT c1, max(rowkey) mrk, max(c2) mc2 FROM " + tableName + " where rowkey <= 'z' GROUP BY c1) t2 "
+ + "ON t1.rowkey = t2.mrk WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
// Use the default plan when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- String defaultPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
" SERVER FILTER BY C1 LIKE 'X0%'\n" +
" PARALLEL INNER-JOIN TABLE 0\n" +
- " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
- " SERVER FILTER BY FIRST KEY ONLY\n" +
- " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
" CLIENT MERGE SORT\n" +
- " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
- assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
- plan.contains(defaultPlan));
+ " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.MRK)");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -393,20 +362,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the optimal plan based on cost when stats become available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- String optimizedPlan =
+ verifyQueryPlan(query,
"CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
" SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
"CLIENT MERGE SORT\n" +
" PARALLEL INNER-JOIN TABLE 0\n" +
- " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
- " SERVER FILTER BY C1 LIKE 'X%'\n" +
- " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
- " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
- assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
- plan.contains(optimizedPlan));
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.MRK)");
} finally {
conn.close();
}
@@ -432,10 +398,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
// Use the index table plan that opts out order-by when stats are not available.
- ResultSet rs = conn.createStatement().executeQuery("explain " + query);
- String plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ verifyQueryPlan(query, indexPlan);
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
for (int i = 0; i < 10000; i++) {
@@ -449,18 +412,261 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
// Use the data table plan that has a lower cost when stats are available.
- rs = conn.createStatement().executeQuery("explain " + query);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
- plan.contains(dataPlan));
+ verifyQueryPlan(query, dataPlan);
// Use the index table plan as has been hinted.
- rs = conn.createStatement().executeQuery("explain " + hintedQuery);
- plan = QueryUtil.getExplainPlan(rs);
- assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
- plan.contains(indexPlan));
+ verifyQueryPlan(hintedQuery, indexPlan);
} finally {
conn.close();
}
}
+
+ /** Sort-merge-join w/ both children ordered wins over hash-join. */
+ @Test
+ public void testJoinStrategy() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000;
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Sort-merge-join w/ both children ordered wins over hash-join in an un-grouped aggregate query. */
+ @Test
+ public void testJoinStrategy2() throws Exception {
+ String q = "SELECT count(*)\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "WHERE t1.COL1 < 200";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ " SERVER FILTER BY COL1 < 200\n" +
+ "AND (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT AGGREGATE INTO SINGLE ROW";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join w/ PK/FK optimization wins over sort-merge-join w/ larger side ordered. */
+ @Test
+ public void testJoinStrategy3() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.COL1 = t2.ID\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " DYNAMIC SERVER FILTER BY T2.ID IN (T1.COL1)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join w/ PK/FK optimization wins over hash-join w/o PK/FK optimization when two sides are close in size. */
+ @Test
+ public void testJoinStrategy4() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable990 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.COL1";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join wins over sort-merge-join w/ smaller side ordered. */
+ @Test
+ public void testJoinStrategy5() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.COL1\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /** Hash-join wins over sort-merge-join w/o any side ordered. */
+ @Test
+ public void testJoinStrategy6() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.COL1 = t2.COL1\n" +
+ "WHERE t1.ID > 200";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Hash-join wins over sort-merge-join w/ both sides ordered in an order-by query.
+ * This is because order-by can only be done on the client side after sort-merge-join
+ * and order-by w/o limit on the client side is very expensive.
+ */
+ @Test
+ public void testJoinStrategy7() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "ORDER BY t1.COL1";
+ String expected =
+ "CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER SORTED BY [T1.COL1]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ " DYNAMIC SERVER FILTER BY T2.ID IN (T1.ID)";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Sort-merge-join w/ both sides ordered wins over hash-join in an order-by limit query.
+ * This is because order-by can only be done on the client side after sort-merge-join
+ * but order-by w/ limit on the client side is less expensive.
+ */
+ @Test
+ public void testJoinStrategy8() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+ "ON t1.ID = t2.ID\n" +
+ "ORDER BY t1.COL1 LIMIT 5";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ "CLIENT TOP 5 ROWS SORTED BY [T1.COL1]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: sort-merge-join chosen since all join keys are PK.
+ */
+ @Test
+ public void testJoinStrategy9() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 LEFT JOIN " + testTable500 + " t2\n" +
+ "ON t1.ID = t2.ID AND t2.ID > 200\n" +
+ "LEFT JOIN " + testTable990 + " t3\n" +
+ "ON t1.ID = t3.ID AND t3.ID < 100";
+ String expected =
+ "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " SORT-MERGE-JOIN (LEFT) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: a mix of join strategies chosen based on cost.
+ */
+ @Test
+ public void testJoinStrategy10() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+ "ON t1.ID = t2.COL1 AND t2.ID > 200\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.ID = t3.ID AND t3.ID < 100";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: hash-join two tables in parallel since two RHS tables are both small
+ * and can fit in memory at the same time.
+ */
+ @Test
+ public void testJoinStrategy11() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+ "ON t1.COL2 = t2.COL1 AND t2.ID > 200\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.COL1 = t3.COL2 AND t3.ID < 100";
+ String expected =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+ " PARALLEL INNER-JOIN TABLE 1\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+ verifyQueryPlan(q, expected);
+ }
+
+ /**
+ * Multi-table join: similar to {@link this#testJoinStrategy11()}, but the two RHS
+ * tables cannot fit in memory at the same time, and thus a mix of join strategies
+ * is chosen based on cost.
+ */
+ @Test
+ public void testJoinStrategy12() throws Exception {
+ String q = "SELECT *\n" +
+ "FROM " + testTable1000 + " t1 JOIN " + testTable990 + " t2\n" +
+ "ON t1.COL2 = t2.COL1\n" +
+ "JOIN " + testTable990 + " t3\n" +
+ "ON t1.COL1 = t3.COL2";
+ String expected =
+ "SORT-MERGE-JOIN (INNER) TABLES\n" +
+ " CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+ " SERVER SORTED BY [T1.COL1]\n" +
+ " CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ "AND\n" +
+ " CLIENT PARALLEL 991-WAY FULL SCAN OVER " + testTable990 + "\n" +
+ " SERVER SORTED BY [T3.COL2]\n" +
+ " CLIENT MERGE SORT";
+ verifyQueryPlan(q, expected);
+ }
+
+ private static void verifyQueryPlan(String query, String expected) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + expected + "' in the plan:\n" + plan + ".",
+ plan.contains(expected));
+ }
+
+ private static String initTestTableValues(int rows) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "ID INTEGER NOT NULL PRIMARY KEY,\n" +
+ "COL1 INTEGER," +
+ "COL2 INTEGER)");
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + tableName + " VALUES(?, ?, ?)");
+ for (int i = 0; i < rows; i++) {
+ stmt.setInt(1, i + 1);
+ stmt.setInt(2, rows - i);
+ stmt.setInt(3, rows + i);
+ stmt.execute();
+ }
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ return tableName;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f3c4c24..f5a7e39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -71,6 +71,8 @@ import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.parse.TableWildcardParseNode;
import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -124,6 +126,8 @@ public class JoinCompiler {
private final ColumnResolver origResolver;
private final boolean useStarJoin;
private final Map<ColumnRef, ColumnRefType> columnRefs;
+ private final boolean useSortMergeJoin;
+ private final boolean costBased;
private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
@@ -132,6 +136,9 @@ public class JoinCompiler {
this.origResolver = resolver;
this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+ this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
+ this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
+ QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
}
public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -365,6 +372,42 @@ public class JoinCompiler {
}
/**
+ * Return a list of all applicable join strategies. The order of the strategies in the
+ * returned list is based on the static rule below. However, the caller can decide on
+ * an optimal join strategy by evaluating and comparing the costs.
+ * 1. If hint USE_SORT_MERGE_JOIN is specified,
+ * return a singleton list containing only SORT_MERGE.
+ * 2. If 1) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B"; or
+ * 2) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B (LEFT/INNER/SEMI/ANTI JOIN C)+"
+ * and hint NO_STAR_JOIN is not specified,
+ * add BUILD_RIGHT to the returned list.
+ * 3. If matches pattern "A RIGHT/INNER JOIN B", where B is either a named table reference
+ * or a flat sub-query,
+ * add BUILD_LEFT to the returned list.
+ * 4. add SORT_MERGE to the returned list.
+ */
+ public List<Strategy> getApplicableJoinStrategies() {
+ List<Strategy> strategies = Lists.newArrayList();
+ if (useSortMergeJoin) {
+ strategies.add(Strategy.SORT_MERGE);
+ } else {
+ if (getStarJoinVector() != null) {
+ strategies.add(Strategy.HASH_BUILD_RIGHT);
+ }
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ if ((type == JoinType.Right || type == JoinType.Inner)
+ && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+ && lastJoinSpec.getJoinTable().getTable().isFlat()) {
+ strategies.add(Strategy.HASH_BUILD_LEFT);
+ }
+ strategies.add(Strategy.SORT_MERGE);
+ }
+
+ return strategies;
+ }
+
+ /**
* Returns a boolean vector indicating whether the evaluation of join expressions
* can be evaluated at an early stage if the input JoinSpec can be taken as a
* star join. Otherwise returns null.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index fa48e52..e536ae9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -251,7 +252,12 @@ public class ListJarsQueryPlan implements QueryPlan {
return false;
}
- @Override
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return Collections.<TableRef>emptySet();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index c5bc44c..c8650b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -52,6 +53,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.EqualParseNode;
import org.apache.phoenix.parse.HintNode.Hint;
@@ -63,7 +65,10 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PDatum;
@@ -102,10 +107,10 @@ public class QueryCompiler {
private final ParallelIteratorFactory parallelIteratorFactory;
private final SequenceManager sequenceManager;
private final boolean projectTuples;
- private final boolean useSortMergeJoin;
private final boolean noChildParentJoinOptimization;
private final QueryPlan dataPlan;
-
+ private final boolean costBased;
+
public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
}
@@ -119,9 +124,10 @@ public class QueryCompiler {
this.parallelIteratorFactory = parallelIteratorFactory;
this.sequenceManager = sequenceManager;
this.projectTuples = projectTuples;
- this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
- if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+ ConnectionQueryServices services = statement.getConnection().getQueryServices();
+ this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+ if (services.getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
}
if (select.getHint().hasHint(Hint.NO_CACHE)) {
@@ -201,41 +207,17 @@ public class QueryCompiler {
}
}
- /*
+ /**
* Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
- * This matches the input JoinTable node against patterns in the following order:
- * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
- * Returns the compilation result of a single table scan or of an independent subquery.
- * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
- * 1) A LEFT/INNER JOIN B
- * 2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
- * where A can be a named table reference or a flat subquery, and B, C, ... can be a named
- * table reference, a sub-join or a subquery of any kind.
- * Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
- * 3. Matching pattern:
- * A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
- * where B can be a named table reference or a flat subquery, and A can be a named table
- * reference, a sub-join or a subquery of any kind.
- * Returns a HashJoinPlan{scan: B, hash: A}.
- * NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
- * "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
- * parenthesis is considered a sub-join.
- * viewed as a sub-join.
- * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
- * Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
- * of both sides as order-by clauses.
- * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
- *
- * If no join algorithm hint is provided, according to the above compilation process, a join query
- * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
- * TODO 1) Use table statistics to guide the choice of join plans.
- * 2) Make it possible to hint a certain join algorithm for a specific join step.
+ * If it is a leaf node, call compileSingleFlatQuery() or compileSubquery(), otherwise:
+ * 1) If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, return the
+ * join plan with the best cost. Note that the "best" plan is only locally optimal,
+ * and might or might not be globally optimal.
+ * 2) Otherwise, return the join plan compiled with the default strategy.
+ * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
*/
- @SuppressWarnings("unchecked")
protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
- byte[] emptyByteArray = new byte[0];
- List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
- if (joinSpecs.isEmpty()) {
+ if (joinTable.getJoinSpecs().isEmpty()) {
Table table = joinTable.getTable();
SelectStatement subquery = table.getAsSubquery(orderBy);
if (!table.isSubselect()) {
@@ -252,199 +234,230 @@ public class QueryCompiler {
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
}
-
- boolean[] starJoinVector;
- if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
- Table table = joinTable.getTable();
- PTable initialProjectedTable;
- TableRef tableRef;
- SelectStatement query;
- TupleProjector tupleProjector;
- if (!table.isSubselect()) {
- context.setCurrentTable(table.getTableRef());
- initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
- tableRef = table.getTableRef();
- table.projectColumns(context.getScan());
- query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
- tupleProjector = new TupleProjector(initialProjectedTable);
- } else {
- SelectStatement subquery = table.getAsSubquery(orderBy);
- QueryPlan plan = compileSubquery(subquery, false);
- initialProjectedTable = table.createProjectedTable(plan.getProjector());
- tableRef = plan.getTableRef();
- context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
- query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
- tupleProjector = new TupleProjector(plan.getProjector());
+
+ List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies();
+ assert strategies.size() > 0;
+ if (!costBased || strategies.size() == 1) {
+ return compileJoinQuery(
+ strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ }
+
+ QueryPlan bestPlan = null;
+ Cost bestCost = null;
+ for (JoinCompiler.Strategy strategy : strategies) {
+ StatementContext newContext = new StatementContext(
+ context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
+ QueryPlan plan = compileJoinQuery(
+ strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+ Cost cost = plan.getCost();
+ if (bestPlan == null || cost.compareTo(bestCost) < 0) {
+ bestPlan = plan;
+ bestCost = cost;
}
- context.setCurrentTable(tableRef);
- PTable projectedTable = initialProjectedTable;
- int count = joinSpecs.size();
- ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
- List<Expression>[] joinExpressions = new List[count];
- JoinType[] joinTypes = new JoinType[count];
- PTable[] tables = new PTable[count];
- int[] fieldPositions = new int[count];
- StatementContext[] subContexts = new StatementContext[count];
- QueryPlan[] subPlans = new QueryPlan[count];
- HashSubPlan[] hashPlans = new HashSubPlan[count];
- fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
- for (int i = 0; i < count; i++) {
- JoinSpec joinSpec = joinSpecs.get(i);
- Scan subScan = ScanUtil.newScan(originalScan);
- subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
- boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
- if (hasPostReference) {
- tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
- projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+ }
+ context.setResolver(bestPlan.getContext().getResolver());
+ context.setCurrentTable(bestPlan.getContext().getCurrentTable());
+ return bestPlan;
+ }
+
+ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+ byte[] emptyByteArray = new byte[0];
+ List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
+ switch (strategy) {
+ case HASH_BUILD_RIGHT: {
+ boolean[] starJoinVector = joinTable.getStarJoinVector();
+ Table table = joinTable.getTable();
+ PTable initialProjectedTable;
+ TableRef tableRef;
+ SelectStatement query;
+ TupleProjector tupleProjector;
+ if (!table.isSubselect()) {
+ context.setCurrentTable(table.getTableRef());
+ initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
+ tableRef = table.getTableRef();
+ table.projectColumns(context.getScan());
+ query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
+ tupleProjector = new TupleProjector(initialProjectedTable);
} else {
- tables[i] = null;
+ SelectStatement subquery = table.getAsSubquery(orderBy);
+ QueryPlan plan = compileSubquery(subquery, false);
+ initialProjectedTable = table.createProjectedTable(plan.getProjector());
+ tableRef = plan.getTableRef();
+ context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+ query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+ tupleProjector = new TupleProjector(plan.getProjector());
}
- }
- for (int i = 0; i < count; i++) {
- JoinSpec joinSpec = joinSpecs.get(i);
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
- joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
- Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
- joinExpressions[i] = joinConditions.getFirst();
- List<Expression> hashExpressions = joinConditions.getSecond();
- Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
- Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
- Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
- joinTypes[i] = joinSpec.getType();
- if (i < count - 1) {
- fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+ context.setCurrentTable(tableRef);
+ PTable projectedTable = initialProjectedTable;
+ int count = joinSpecs.size();
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+ List<Expression>[] joinExpressions = new List[count];
+ JoinType[] joinTypes = new JoinType[count];
+ PTable[] tables = new PTable[count];
+ int[] fieldPositions = new int[count];
+ StatementContext[] subContexts = new StatementContext[count];
+ QueryPlan[] subPlans = new QueryPlan[count];
+ HashSubPlan[] hashPlans = new HashSubPlan[count];
+ fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
+ for (int i = 0; i < count; i++) {
+ JoinSpec joinSpec = joinSpecs.get(i);
+ Scan subScan = ScanUtil.newScan(originalScan);
+ subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+ subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+ boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
+ if (hasPostReference) {
+ tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
+ projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+ } else {
+ tables[i] = null;
+ }
}
- hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
- }
- TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
- Integer limit = null;
- Integer offset = null;
- if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
- limit = plan.getLimit();
- offset = plan.getOffset();
- }
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
- starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
- }
-
- JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
- JoinType type = lastJoinSpec.getType();
- if (!this.useSortMergeJoin
- && (type == JoinType.Right || type == JoinType.Inner)
- && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
- && lastJoinSpec.getJoinTable().getTable().isFlat()) {
- JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
- Table rhsTable = rhsJoinTable.getTable();
- JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
- Scan subScan = ScanUtil.newScan(originalScan);
- StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
- PTable rhsProjTable;
- TableRef rhsTableRef;
- SelectStatement rhs;
- TupleProjector tupleProjector;
- if (!rhsTable.isSubselect()) {
- context.setCurrentTable(rhsTable.getTableRef());
- rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
- rhsTableRef = rhsTable.getTableRef();
- rhsTable.projectColumns(context.getScan());
- rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
- tupleProjector = new TupleProjector(rhsProjTable);
- } else {
- SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
- QueryPlan plan = compileSubquery(subquery, false);
- rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
- rhsTableRef = plan.getTableRef();
- context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
- rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
- tupleProjector = new TupleProjector(plan.getProjector());
+ for (int i = 0; i < count; i++) {
+ JoinSpec joinSpec = joinSpecs.get(i);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
+ joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
+ Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], strategy);
+ joinExpressions[i] = joinConditions.getFirst();
+ List<Expression> hashExpressions = joinConditions.getSecond();
+ Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+ boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+ Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
+ Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
+ joinTypes[i] = joinSpec.getType();
+ if (i < count - 1) {
+ fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+ }
+ hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+ }
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
+ Integer limit = null;
+ Integer offset = null;
+ if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+ limit = plan.getLimit();
+ offset = plan.getOffset();
+ }
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
+ starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+ return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
}
- context.setCurrentTable(rhsTableRef);
- context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
- ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
- List<Expression> joinExpressions = joinConditions.getSecond();
- List<Expression> hashExpressions = joinConditions.getFirst();
- boolean needsMerge = lhsJoin.hasPostReference();
- PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
- int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
- PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
- TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
- QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
- Integer limit = null;
- Integer offset = null;
- if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
- limit = rhsPlan.getLimit();
- offset = rhsPlan.getOffset();
+ case HASH_BUILD_LEFT: {
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
+ Table rhsTable = rhsJoinTable.getTable();
+ JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+ Scan subScan = ScanUtil.newScan(originalScan);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+ PTable rhsProjTable;
+ TableRef rhsTableRef;
+ SelectStatement rhs;
+ TupleProjector tupleProjector;
+ if (!rhsTable.isSubselect()) {
+ context.setCurrentTable(rhsTable.getTableRef());
+ rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
+ rhsTableRef = rhsTable.getTableRef();
+ rhsTable.projectColumns(context.getScan());
+ rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
+ tupleProjector = new TupleProjector(rhsProjTable);
+ } else {
+ SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
+ QueryPlan plan = compileSubquery(subquery, false);
+ rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
+ rhsTableRef = plan.getTableRef();
+ context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+ rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+ tupleProjector = new TupleProjector(plan.getProjector());
+ }
+ context.setCurrentTable(rhsTableRef);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[]{new ImmutableBytesPtr(emptyByteArray)};
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, strategy);
+ List<Expression> joinExpressions = joinConditions.getSecond();
+ List<Expression> hashExpressions = joinConditions.getFirst();
+ boolean needsMerge = lhsJoin.hasPostReference();
+ PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
+ int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
+ PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
+ QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
+ Integer limit = null;
+ Integer offset = null;
+ if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+ limit = rhsPlan.getLimit();
+ offset = rhsPlan.getOffset();
+ }
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
+ new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
+ new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+ Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+ getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
+ return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
}
- HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions },
- new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true },
- new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
- }
-
- JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
- JoinTable rhsJoin = lastJoinSpec.getJoinTable();
- if (type == JoinType.Right) {
- JoinTable temp = lhsJoin;
- lhsJoin = rhsJoin;
- rhsJoin = temp;
- }
-
- List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
- List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
- List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
- for (EqualParseNode condition : joinConditionNodes) {
- lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
- rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
- }
-
- Scan lhsScan = ScanUtil.newScan(originalScan);
- StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
- boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
- PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
- boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
-
- Scan rhsScan = ScanUtil.newScan(originalScan);
- StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
- QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
- PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
-
- Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
- List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
- List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
-
- boolean needsMerge = rhsJoin.hasPostReference();
- int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
- PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+ case SORT_MERGE: {
+ JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+ JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+ JoinType type = lastJoinSpec.getType();
+ JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+ if (type == JoinType.Right) {
+ JoinTable temp = lhsJoin;
+ lhsJoin = rhsJoin;
+ rhsJoin = temp;
+ }
- ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
- TableRef tableRef = resolver.getTables().get(0);
- StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
- subCtx.setCurrentTable(tableRef);
- QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
- context.setCurrentTable(tableRef);
- context.setResolver(resolver);
- TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
- ParseNode where = joinTable.getPostFiltersCombined();
- SelectStatement select = asSubquery
- ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
- Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false,
- joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(),
+ List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+ List<OrderByNode> lhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+ List<OrderByNode> rhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+ for (EqualParseNode condition : joinConditionNodes) {
+ lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+ rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+ }
+
+ Scan lhsScan = ScanUtil.newScan(originalScan);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+ boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+ PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
+ boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+
+ Scan rhsScan = ScanUtil.newScan(originalScan);
+ StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+ QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+ PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
+
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
+ List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+ List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+
+ boolean needsMerge = rhsJoin.hasPostReference();
+ int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
+ PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+ ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
+ TableRef tableRef = resolver.getTables().get(0);
+ StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+ subCtx.setCurrentTable(tableRef);
+ QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
+ context.setCurrentTable(tableRef);
+ context.setResolver(resolver);
+ TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+ ParseNode where = joinTable.getPostFiltersCombined();
+ SelectStatement select = asSubquery
+ ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
+ Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false,
+ joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(),
joinTable.getStatement().getUdfParseNodes())
- : NODE_FACTORY.select(joinTable.getStatement(), from, where);
-
- return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+ : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+
+ return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+ }
+ default:
+ throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
+ }
}
private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index ca88984..c2edaf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.optimize.Cost;
@@ -90,4 +91,5 @@ public interface QueryPlan extends StatementPlan {
*/
public boolean useRoundRobinIterator() throws SQLException;
+ <T> T accept(QueryPlanVisitor<T> visitor);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 5cd1d08..18060f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -276,6 +277,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
public Long getEstimatedRowsToScan() {
return 0l;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2e042e7..0c8e8dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -33,6 +33,10 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.RowKeyExpression;
@@ -117,25 +121,39 @@ public class AggregatePlan extends BaseQueryPlan {
@Override
public Cost getCost() {
- Long byteCount = null;
+ Double outputBytes = this.accept(new ByteCountVisitor());
+ Double rowWidth = this.accept(new AvgRowWidthVisitor());
+ Long inputRows = null;
try {
- byteCount = getEstimatedBytesToScan();
+ inputRows = getEstimatedRowsToScan();
} catch (SQLException e) {
// ignored.
}
-
- if (byteCount == null) {
+ if (inputRows == null || outputBytes == null || rowWidth == null) {
return Cost.UNKNOWN;
}
+ double inputBytes = inputRows * rowWidth;
+ double rowsBeforeHaving = RowCountVisitor.aggregate(
+ RowCountVisitor.filter(
+ inputRows.doubleValue(),
+ RowCountVisitor.stripSkipScanFilter(
+ context.getScan().getFilter())),
+ groupBy);
+ double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+ double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+ double bytesAfterHaving = rowWidth * rowsAfterHaving;
int parallelLevel = CostUtil.estimateParallelLevel(
true, context.getConnection().getQueryServices());
- Cost cost = CostUtil.estimateAggregateCost(byteCount,
- groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+ Cost cost = new Cost(0, 0, inputBytes);
+ Cost aggCost = CostUtil.estimateAggregateCost(
+ inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
+ cost = cost.plus(aggCost);
if (!orderBy.getOrderByExpressions().isEmpty()) {
- double outputBytes = CostUtil.estimateAggregateOutputBytes(
- byteCount, groupBy, aggregators.getEstimatedByteSize());
- Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ Cost orderByCost = CostUtil.estimateOrderByCost(
+ bytesAfterHaving, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return cost;
@@ -304,4 +322,9 @@ public class AggregatePlan extends BaseQueryPlan {
return false;
}
+ @Override
+ public <T> T accept(QueryPlanVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
}
[5/9] phoenix git commit: PHOENIX-3505 Avoid NPE on close() in
OrderedResultIterator
Posted by ma...@apache.org.
PHOENIX-3505 Avoid NPE on close() in OrderedResultIterator
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c758234
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c758234
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c758234
Branch: refs/heads/4.x-HBase-0.98
Commit: 2c758234186e5a4d70cdc6501df19f9b0d9ec601
Parents: 7ef96fe
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 23 11:16:35 2016 -0500
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 20:08:46 2018 -0700
----------------------------------------------------------------------
.../phoenix/iterate/OrderedResultIterator.java | 5 ++-
.../iterate/OrderedResultIteratorTest.java | 41 ++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c758234/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 36ca00b..36b274a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -280,7 +280,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
@Override
public void close() throws SQLException {
- resultIterator.close();
+ // Guard against resultIterator being null
+ if (null != resultIterator) {
+ resultIterator.close();
+ }
resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c758234/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
new file mode 100644
index 0000000..50ed8e9
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.expression.OrderByExpression;
+import org.junit.Test;
+
+/**
+ * Test class for {@link OrderedResultIterator}.
+ */
+public class OrderedResultIteratorTest {
+
+ @Test
+ public void testNullIteratorOnClose() throws SQLException {
+ ResultIterator delegate = ResultIterator.EMPTY_ITERATOR;
+ List<OrderByExpression> orderByExpressions = Collections.singletonList(null);
+ int thresholdBytes = Integer.MAX_VALUE;
+ OrderedResultIterator iterator = new OrderedResultIterator(delegate, orderByExpressions, thresholdBytes);
+ // Should not throw an exception
+ iterator.close();
+ }
+
+}