You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/02/01 00:06:49 UTC
[17/35] phoenix git commit: PHOENIX-4288 Indexes not used when
ordering by primary key
PHOENIX-4288 Indexes not used when ordering by primary key
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d790c707
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d790c707
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d790c707
Branch: refs/heads/4.x-cdh5.11.2
Commit: d790c707550647728afd574e11787503fd0c231a
Parents: f94f4eb
Author: maryannxue <ma...@gmail.com>
Authored: Sun Nov 5 02:37:55 2017 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Jan 31 22:24:48 2018 +0000
----------------------------------------------------------------------
.../phoenix/end2end/CostBasedDecisionIT.java | 466 +++++++++++++++++++
.../apache/phoenix/end2end/MutationStateIT.java | 17 +
.../phoenix/compile/ListJarsQueryPlan.java | 6 +
.../org/apache/phoenix/compile/QueryPlan.java | 5 +-
.../apache/phoenix/compile/TraceQueryPlan.java | 6 +
.../apache/phoenix/execute/AggregatePlan.java | 30 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 21 +-
.../phoenix/execute/ClientAggregatePlan.java | 28 ++
.../apache/phoenix/execute/ClientScanPlan.java | 25 +
.../apache/phoenix/execute/CorrelatePlan.java | 25 +
.../phoenix/execute/DelegateQueryPlan.java | 6 +
.../apache/phoenix/execute/HashJoinPlan.java | 29 ++
.../execute/LiteralResultIterationPlan.java | 6 +
.../org/apache/phoenix/execute/ScanPlan.java | 25 +
.../phoenix/execute/SortMergeJoinPlan.java | 18 +
.../org/apache/phoenix/execute/UnionPlan.java | 10 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +
.../java/org/apache/phoenix/optimize/Cost.java | 123 +++++
.../apache/phoenix/optimize/QueryOptimizer.java | 30 +-
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../phoenix/query/QueryServicesOptions.java | 4 +
.../java/org/apache/phoenix/util/CostUtil.java | 90 ++++
.../query/ParallelIteratorsSplitTest.java | 6 +
23 files changed, 971 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
new file mode 100644
index 0000000..a3584ce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+ props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering1() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
+ // Use the data table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+ plan.contains("FULL SCAN"));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the index table plan that has a lower cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+ plan.contains("RANGE SCAN"));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ // Use the index table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+ plan.contains("RANGE SCAN"));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Given that the range on C1 is meaningless and group-by becomes
+ // order-preserving if using the data table, the data table plan should
+ // come out as the best plan based on the costs.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+ plan.contains("FULL SCAN"));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrdering3() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "UPSERT SELECT\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "UPSERT SELECT\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 INTEGER,\n" +
+ "c2 INTEGER,\n" +
+ "c3 INTEGER)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+ String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+ // Use the idx2 plan with a wider PK slot span when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String indexPlan =
+ "DELETE ROWS\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+ " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setInt(2, i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the idx2 plan that scans less data when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String dataPlan =
+ "DELETE ROWS\n" +
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+ " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+ "CLIENT MERGE SORT";
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInUnionQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
+ + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+ // Use the default plan when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String defaultPlan =
+ "UNION ALL OVER 2 QUERIES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT MERGE SORT";
+ assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(defaultPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the optimal plan based on cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String optimizedPlan =
+ "UNION ALL OVER 2 QUERIES\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+ " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
+ assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(optimizedPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCostOverridesStaticPlanOrderingInJoinQuery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey VARCHAR PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
+ + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
+ + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+ // Use the default plan when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ String defaultPlan =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X0%'\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+ " CLIENT MERGE SORT\n" +
+ " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
+ assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(defaultPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the optimal plan based on cost when stats become available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ String optimizedPlan =
+ "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+ " SERVER FILTER BY C1 LIKE 'X%'\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
+ " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
+ assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(optimizedPlan));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testHintOverridesCost() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ try {
+ String tableName = BaseTest.generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+ "rowkey INTEGER PRIMARY KEY,\n" +
+ "c1 VARCHAR,\n" +
+ "c2 VARCHAR)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+ String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where rowkey between 1 and 10 ORDER BY c1";
+ String hintedQuery = query.replaceFirst("SELECT",
+ "SELECT /*+ INDEX(" + tableName + " " + tableName + "_idx) */");
+ String dataPlan = "SERVER SORTED BY [C1]";
+ String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
+
+ // Use the index table plan that opts out order-by when stats are not available.
+ ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+ for (int i = 0; i < 10000; i++) {
+ int c1 = i % 16;
+ stmt.setInt(1, i);
+ stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+ stmt.setString(3, "c");
+ stmt.execute();
+ }
+
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+ // Use the data table plan that has a lower cost when stats are available.
+ rs = conn.createStatement().executeQuery("explain " + query);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(dataPlan));
+
+ // Use the index table plan as has been hinted.
+ rs = conn.createStatement().executeQuery("explain " + hintedQuery);
+ plan = QueryUtil.getExplainPlan(rs);
+ assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+ plan.contains(indexPlan));
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index 2d5f360..36782c1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.phoenix.end2end;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 839e7c9..0688b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
@@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public TableRef getTableRef() {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index f7cdcbf..ca88984 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan {
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
public long getEstimatedSize();
-
+
+ public Cost getCost();
+
// TODO: change once joins are supported
TableRef getTableRef();
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 62e6991..2714858 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
@@ -194,6 +195,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public Set<TableRef> getSourceRefs() {
return Collections.emptySet();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 4c29abe..369769e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
@@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan {
public Expression getHaving() {
return having;
}
-
+
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, aggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
@Override
public List<KeyRange> getSplits() {
if (splits == null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index c1ddd44..31f67b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -63,6 +63,8 @@ import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -500,13 +502,24 @@ public abstract class BaseQueryPlan implements QueryPlan {
if (context.getScanRanges() == ScanRanges.NOTHING) {
return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
}
-
+
+ // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
+ // get the stats for computing costs.
+ boolean costBased =
+ context.getConnection().getQueryServices().getConfiguration().getBoolean(
+ QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+ if (costBased) {
+ ResultIterator iterator = iterator();
+ iterator.close();
+ }
// Optimize here when getting explain plan, as queries don't get optimized until after compilation
QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
- this.estimatedRows = plan.getEstimatedRowsToScan();
- this.estimatedSize = plan.getEstimatedBytesToScan();
- this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ if (!costBased) { // do not override estimates if they are used for cost calculation.
+ this.estimatedRows = plan.getEstimatedRowsToScan();
+ this.estimatedSize = plan.getEstimatedBytesToScan();
+ this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ }
return exp;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 8ef1f8d..a15ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.TupleUtil;
import com.google.common.collect.Lists;
@@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, clientAggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 6bbc545..5799990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import com.google.common.collect.Lists;
@@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index ee81c36..270ad3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
@@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan {
return null;
}
+ @Override
+ public Cost getCost() {
+ Long lhsByteCount = null;
+ try {
+ lhsByteCount = delegate.getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+ Long rhsRowCount = null;
+ try {
+ rhsRowCount = rhs.getEstimatedRowsToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (lhsByteCount == null || rhsRowCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
+ Cost lhsCost = delegate.getCost();
+ return cost.plus(lhsCost).plus(rhs.getCost());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3c62c5b..3da06db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return delegate.getCost();
+ }
+
+ @Override
public TableRef getTableRef() {
return delegate.getTableRef();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2b90dcb..2d2ff4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
@@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan {
return statement;
}
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ Cost lhsCost = delegate.getCost();
+ if (keyRangeExpressions != null) {
+ // The selectivity of the dynamic rowkey filter.
+ // TODO replace the constant with an estimate value.
+ double selectivity = 0.01;
+ lhsCost = lhsCost.multiplyBy(selectivity);
+ }
+ Cost rhsCost = Cost.ZERO;
+ for (SubPlan subPlan : subPlans) {
+ rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ }
+ return cost.plus(lhsCost).plus(rhsCost);
+ }
+
protected interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 86f59c5..1d1332d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 1e1cb0d..31d7097 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -53,6 +53,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -64,6 +65,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -189,6 +191,29 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
if (splits == null)
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+ }
+
+ @Override
public StatementContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.UnionResultIterators;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Cost cost = Cost.ZERO;
+ for (QueryPlan plan : plans) {
+ cost = cost.plus(plan.getCost());
+ }
+ return cost;
+ }
+
+ @Override
public ParameterMetaData getParameterMetaData() {
return paramMetaData;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 384c8cc..26b4415 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.AliasedNode;
@@ -647,6 +648,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public TableRef getTableRef() {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+ /** The unknown cost. */
+ public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+ @Override
+ public String toString() {
+ return "{unknown}";
+ }
+ };
+
+ /** The zero cost. */
+ public static Cost ZERO = new Cost(0, 0, 0) {
+ @Override
+ public String toString() {
+ return "{zero}";
+ }
+ };
+
+ private final double cpu;
+ private final double memory;
+ private final double io;
+
+ public Cost(double cpu, double memory, double io) {
+ this.cpu = cpu;
+ this.memory = memory;
+ this.io = io;
+ }
+
+ public double getCpu() {
+ return cpu;
+ }
+
+ public double getMemory() {
+ return memory;
+ }
+
+ public double getIo() {
+ return io;
+ }
+
+ public boolean isUnknown() {
+ return this == UNKNOWN;
+ }
+
+ public Cost plus(Cost other) {
+ if (isUnknown() || other.isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu + other.cpu,
+ this.memory + other.memory,
+ this.io + other.io);
+ }
+
+ public Cost multiplyBy(double factor) {
+ if (isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu * factor,
+ this.memory * factor,
+ this.io * factor);
+ }
+
+ // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may
+ // add those into account as our cost model mature.
+ @Override
+ public int compareTo(Cost other) {
+ if (isUnknown() && other.isUnknown()) {
+ return 0;
+ } else if (isUnknown() && !other.isUnknown()) {
+ return 1;
+ } else if (!isUnknown() && other.isUnknown()) {
+ return -1;
+ }
+
+ double d = this.io - other.io;
+ return d == 0 ? 0 : (d > 0 ? 1 : -1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj
+ || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cpu, memory, io);
+ }
+
+ @Override
+ public String toString() {
+ return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index b3df50b..64dad58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -67,10 +67,12 @@ public class QueryOptimizer {
private final QueryServices services;
private final boolean useIndexes;
+ private final boolean costBased;
public QueryOptimizer(QueryServices services) {
this.services = services;
this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+ this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
}
public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException {
@@ -91,7 +93,7 @@ public class QueryOptimizer {
}
public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
- List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
+ List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
return plans.get(0);
}
@@ -309,10 +311,11 @@ public class QueryOptimizer {
}
return null;
}
-
+
/**
* Order the plans among all the possible ones from best to worst.
- * Since we don't keep stats yet, we use the following simple algorithm:
+ * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
+ * their costs, otherwise we use the following simple algorithm:
* 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately.
* 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
* in the same order as the row key columns.
@@ -320,9 +323,6 @@ public class QueryOptimizer {
* a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots).
* b) the plan that preserves ordering for a group by.
* c) the non local index table plan
- * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily
- * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges
- * separately, or we could calculate the bytes scanned between the start and stop row of each table.
* @param plans the list of candidate plans
* @return list of plans ordered from best to worst.
*/
@@ -331,7 +331,21 @@ public class QueryOptimizer {
if (plans.size() == 1) {
return plans;
}
-
+
+ if (this.costBased) {
+ Collections.sort(plans, new Comparator<QueryPlan>() {
+ @Override
+ public int compare(QueryPlan plan1, QueryPlan plan2) {
+ return plan1.getCost().compareTo(plan2.getCost());
+ }
+ });
+ // Return ordered list based on cost if stats are available; otherwise fall
+ // back to static ordering.
+ if (!plans.get(0).getCost().isUnknown()) {
+ return stopAtBestPlan ? plans.subList(0, 1) : plans;
+ }
+ }
+
/**
* If we have a plan(s) that are just point lookups (i.e. fully qualified row
* keys), then favor those first.
@@ -428,7 +442,7 @@ public class QueryOptimizer {
}
});
-
+
return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 59f7385..0b80f4d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -294,6 +294,9 @@ public interface QueryServices extends SQLCloseable {
//Update Cache Frequency default config attribute
public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB = "phoenix.default.update.cache.frequency";
+ // Whether to enable cost-based-decision in the query optimizer
+ public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3ceb084..4d31974 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -341,6 +342,8 @@ public class QueryServicesOptions {
// RS -> RS calls for upsert select statements are disabled by default
public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
+ public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
@@ -418,6 +421,7 @@ public class QueryServicesOptions {
.setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE)
.setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED)
.setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
+ .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
.setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
new file mode 100644
index 0000000..1d4b8e0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.optimize.Cost;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Utilities for computing costs.
+ *
+ * Some of the methods here should eventually be replaced by a metadata framework which
+ * estimates output metrics for each QueryPlan or operation, e.g. row count, byte count,
+ * etc.
+ */
+public class CostUtil {
+
+ // An estimate of the ratio of result data from group-by against the input data.
+ private final static double GROUPING_FACTOR = 0.1;
+
+ // Io operations conducted in intermediate evaluations like sorting or aggregation
+ // should be counted twice since they usually involve both read and write.
+ private final static double IO_COST_MULTIPLIER = 2.0;
+
+ /**
+ * Estimate the number of output bytes of an aggregate.
+ * @param byteCount the number of input bytes
+ * @param groupBy the compiled GroupBy object
+ * @param aggregatorsSize the byte size of aggregators
+ * @return the output byte count
+ */
+ public static double estimateAggregateOutputBytes(
+ double byteCount, GroupBy groupBy, int aggregatorsSize) {
+ if (groupBy.isUngroupedAggregate()) {
+ return aggregatorsSize;
+ }
+ return byteCount * GROUPING_FACTOR;
+ }
+
+ /**
+ * Estimate the cost of an aggregate.
+ * @param byteCount the number of input bytes
+ * @param groupBy the compiled GroupBy object
+ * @param aggregatorsSize the byte size of aggregators
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
+ */
+ public static Cost estimateAggregateCost(
+ double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
+ double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
+ double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
+ return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+ }
+
+ /**
+ * Estimate the cost of an order-by
+ * @param byteCount the number of input bytes
+ * @param parallelLevel number of parallel workers or threads
+ * @return the cost
+ */
+ public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
+ return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+ }
+
+ /**
+ * Estimate the parallel level of an operation
+ * @param runningOnServer if the operation will be running on server side
+ * @param services the QueryServices object
+ * @return the parallel level
+ */
+ public static int estimateParallelLevel(boolean runningOnServer, QueryServices services) {
+ // TODO currently return constants for simplicity, should derive from cluster config.
+ return runningOnServer ? 10 : 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d790c707/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 935d8cb..0f12d9c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
@@ -486,6 +487,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public Long getEstimateInfoTimestamp() throws SQLException {
return null;
}
+
+ @Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null);
List<KeyRange> keyRanges = parallelIterators.getSplits();