You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2018/03/14 04:37:32 UTC

[1/9] phoenix git commit: PHOENIX-4288 Indexes not used when ordering by primary key

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 1e83415f1 -> 9bb7811f0


PHOENIX-4288 Indexes not used when ordering by primary key


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/541d6ac2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/541d6ac2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/541d6ac2

Branch: refs/heads/4.x-HBase-0.98
Commit: 541d6ac22866fe7571365e063a23108c6ca1ea63
Parents: 1e83415
Author: maryannxue <ma...@gmail.com>
Authored: Tue Dec 5 10:52:46 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:16:04 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CostBasedDecisionIT.java    | 466 +++++++++++++++++++
 .../phoenix/compile/ListJarsQueryPlan.java      |   6 +
 .../org/apache/phoenix/compile/QueryPlan.java   |   5 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   6 +
 .../apache/phoenix/execute/AggregatePlan.java   |  30 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  19 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  28 ++
 .../apache/phoenix/execute/ClientScanPlan.java  |  25 +
 .../apache/phoenix/execute/CorrelatePlan.java   |  25 +
 .../phoenix/execute/DelegateQueryPlan.java      |   6 +
 .../apache/phoenix/execute/HashJoinPlan.java    |  29 ++
 .../execute/LiteralResultIterationPlan.java     |   6 +
 .../org/apache/phoenix/execute/ScanPlan.java    |  25 +
 .../phoenix/execute/SortMergeJoinPlan.java      |  18 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  10 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   6 +
 .../java/org/apache/phoenix/optimize/Cost.java  | 123 +++++
 .../apache/phoenix/optimize/QueryOptimizer.java |  28 +-
 .../org/apache/phoenix/query/QueryServices.java |   2 +
 .../phoenix/query/QueryServicesOptions.java     |   6 +-
 .../java/org/apache/phoenix/util/CostUtil.java  |  90 ++++
 .../query/ParallelIteratorsSplitTest.java       |   6 +
 22 files changed, 951 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
new file mode 100644
index 0000000..a3584ce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+        props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering1() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
+            // Use the data table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the index table plan that has a lower cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering2() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the index table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Given that the range on C1 is meaningless and group-by becomes
+            // order-preserving if using the data table, the data table plan should
+            // come out as the best plan based on the costs.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering3() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                    "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                    "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUnionQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
+                    + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "    CLIENT MERGE SORT";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "        SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInJoinQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
+                    + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
+                    + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "    SERVER FILTER BY C1 LIKE 'X0%'\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+                    "            SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "        CLIENT MERGE SORT\n" +
+                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
+                    "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "    SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
+                    "CLIENT MERGE SORT\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "            SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
+                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testHintOverridesCost() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey INTEGER PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where rowkey between 1 and 10 ORDER BY c1";
+            String hintedQuery = query.replaceFirst("SELECT",
+                    "SELECT  /*+ INDEX(" + tableName + " " + tableName + "_idx) */");
+            String dataPlan = "SERVER SORTED BY [C1]";
+            String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
+
+            // Use the index table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setInt(1, i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the data table plan that has a lower cost when stats are available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+
+            // Use the index table plan as has been hinted.
+            rs = conn.createStatement().executeQuery("explain " + hintedQuery);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 1888114..fa48e52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public TableRef getTableRef() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index f7cdcbf..ca88984 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan {
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
 
     public long getEstimatedSize();
-    
+
+    public Cost getCost();
+
     // TODO: change once joins are supported
     TableRef getTableRef();
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2eab965..5cd1d08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -200,6 +201,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public Set<TableRef> getSourceRefs() {
         return Collections.emptySet();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 37e0c5a..2e042e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
@@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan {
     public Expression getHaving() {
         return having;
     }
-    
+
+    @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                true, context.getConnection().getQueryServices());
+        Cost cost = CostUtil.estimateAggregateCost(byteCount,
+                groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            double outputBytes = CostUtil.estimateAggregateOutputBytes(
+                    byteCount, groupBy, aggregators.getEstimatedByteSize());
+            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return cost;
+    }
+
     @Override
     public List<KeyRange> getSplits() {
         if (splits == null)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index df55e63..38ed926 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -504,13 +504,24 @@ public abstract class BaseQueryPlan implements QueryPlan {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
         }
-        
+
+        // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
+        // get the stats for computing costs.
+        boolean costBased =
+                context.getConnection().getQueryServices().getConfiguration().getBoolean(
+                        QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+        if (costBased) {
+            ResultIterator iterator = iterator();
+            iterator.close();
+        }
         // Optimize here when getting explain plan, as queries don't get optimized until after compilation
         QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
         ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
-        this.estimatedRows = plan.getEstimatedRowsToScan();
-        this.estimatedSize = plan.getEstimatedBytesToScan();
-        this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+        if (!costBased) { // do not override estimates if they are used for cost calculation.
+            this.estimatedRows = plan.getEstimatedRowsToScan();
+            this.estimatedSize = plan.getEstimatedBytesToScan();
+            this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+        }
         return exp;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 8ef1f8d..a15ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.TupleUtil;
 
 import com.google.common.collect.Lists;
@@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                false, context.getConnection().getQueryServices());
+        Cost cost = CostUtil.estimateAggregateCost(byteCount,
+                groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            double outputBytes = CostUtil.estimateAggregateOutputBytes(
+                    byteCount, groupBy, clientAggregators.getEstimatedByteSize());
+            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return super.getCost().plus(cost);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 6bbc545..5799990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
 
 import com.google.common.collect.Lists;
 
@@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                false, context.getConnection().getQueryServices());
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return super.getCost().plus(cost);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index ee81c36..270ad3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
@@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan {
         return null;
     }
 
+    @Override
+    public Cost getCost() {
+        Long lhsByteCount = null;
+        try {
+            lhsByteCount = delegate.getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+        Long rhsRowCount = null;
+        try {
+            rhsRowCount = rhs.getEstimatedRowsToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (lhsByteCount == null || rhsRowCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
+        Cost lhsCost = delegate.getCost();
+        return cost.plus(lhsCost).plus(rhs.getCost());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3c62c5b..3da06db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return delegate.getCost();
+    }
+
+    @Override
     public TableRef getTableRef() {
         return delegate.getTableRef();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2b90dcb..2d2ff4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
@@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan {
         return statement;
     }
 
+    @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        Cost lhsCost = delegate.getCost();
+        if (keyRangeExpressions != null) {
+            // The selectivity of the dynamic rowkey filter.
+            // TODO replace the constant with an estimate value.
+            double selectivity = 0.01;
+            lhsCost = lhsCost.multiplyBy(selectivity);
+        }
+        Cost rhsCost = Cost.ZERO;
+        for (SubPlan subPlan : subPlans) {
+            rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+        }
+        return cost.plus(lhsCost).plus(rhsCost);
+    }
+
     protected interface SubPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException;
         public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 4470947..c9abb69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public List<KeyRange> getSplits() {
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index af25bff..d63950c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -65,6 +66,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -193,6 +195,29 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                true, context.getConnection().getQueryServices());
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return cost;
+    }
+
+    @Override
     public List<KeyRange> getSplits() {
         if (splits == null)
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+    }
+
+    @Override
     public StatementContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Cost cost = Cost.ZERO;
+        for (QueryPlan plan : plans) {
+            cost = cost.plus(plan.getCost());
+        }
+        return cost;
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return paramMetaData;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f418bc9..94aeb34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AddJarsStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -644,6 +645,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }
 
                 @Override
+                public Cost getCost() {
+                    return Cost.ZERO;
+                }
+
+                @Override
                 public TableRef getTableRef() {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+    /** The unknown cost. */
+    public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+        @Override
+        public String toString() {
+            return "{unknown}";
+        }
+    };
+
+    /** The zero cost. */
+    public static Cost ZERO = new Cost(0, 0, 0) {
+        @Override
+        public String toString() {
+            return "{zero}";
+        }        
+    };
+
+    private final double cpu;
+    private final double memory;
+    private final double io;
+
+    public Cost(double cpu, double memory, double io) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.io = io;
+    }
+
+    public double getCpu() {
+        return cpu;
+    }
+
+    public double getMemory() {
+        return memory;
+    }
+
+    public double getIo() {
+        return io;
+    }
+
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+    public Cost plus(Cost other) {
+        if (isUnknown() || other.isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu + other.cpu,
+                this.memory + other.memory,
+                this.io + other.io);
+    }
+
+    public Cost multiplyBy(double factor) {
+        if (isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu * factor,
+                this.memory * factor,
+                this.io * factor);
+    }
+
+    // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may
+    // add those into account as our cost model mature.
+    @Override
+    public int compareTo(Cost other) {
+        if (isUnknown() && other.isUnknown()) {
+            return 0;
+        } else if (isUnknown() && !other.isUnknown()) {
+            return 1;
+        } else if (!isUnknown() && other.isUnknown()) {
+            return -1;
+        }
+
+        double d = this.io - other.io;
+        return d == 0 ? 0 : (d > 0 ? 1 : -1);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return this == obj
+                || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cpu, memory, io);
+    }
+
+    @Override
+    public String toString() {
+        return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 028fc94..8481bc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -69,11 +69,13 @@ public class QueryOptimizer {
 
     private final QueryServices services;
     private final boolean useIndexes;
+    private final boolean costBased;
     private long indexPendingDisabledThreshold;
 
     public QueryOptimizer(QueryServices services) {
         this.services = services;
         this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+        this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
         this.indexPendingDisabledThreshold = this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
             QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
     }
@@ -96,7 +98,7 @@ public class QueryOptimizer {
     }
     
     public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
-        List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
+        List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
         return plans.get(0);
     }
     
@@ -329,7 +331,8 @@ public class QueryOptimizer {
 
     /**
      * Order the plans among all the possible ones from best to worst.
-     * Since we don't keep stats yet, we use the following simple algorithm:
+     * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
+     * their costs, otherwise we use the following simple algorithm:
      * 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately.
      * 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
      * in the same order as the row key columns.
@@ -337,9 +340,6 @@ public class QueryOptimizer {
      *    a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots).
      *    b) the plan that preserves ordering for a group by.
      *    c) the non local index table plan
-     * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily
-     * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges 
-     * separately, or we could calculate the bytes scanned between the start and stop row of each table.
      * @param plans the list of candidate plans
      * @return list of plans ordered from best to worst.
      */
@@ -348,7 +348,21 @@ public class QueryOptimizer {
         if (plans.size() == 1) {
             return plans;
         }
-        
+
+        if (this.costBased) {
+            Collections.sort(plans, new Comparator<QueryPlan>() {
+                @Override
+                public int compare(QueryPlan plan1, QueryPlan plan2) {
+                    return plan1.getCost().compareTo(plan2.getCost());
+                }
+            });
+            // Return ordered list based on cost if stats are available; otherwise fall
+            // back to static ordering.
+            if (!plans.get(0).getCost().isUnknown()) {
+                return stopAtBestPlan ? plans.subList(0, 1) : plans;
+            }
+        }
+
         /**
          * If we have a plan(s) that are just point lookups (i.e. fully qualified row
          * keys), then favor those first.
@@ -448,7 +462,7 @@ public class QueryOptimizer {
             }
             
         });
-        
+
         return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 56d6e06..9dd3074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -290,6 +290,8 @@ public interface QueryServices extends SQLCloseable {
     //Update Cache Frequency default config attribute
     public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB  = "phoenix.default.update.cache.frequency";
 
+    // Whether to enable cost-based-decision in the query optimizer
+    public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled";
     public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold";
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 33a319b..11d9784 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -329,6 +330,8 @@ public class QueryServicesOptions {
     // RS -> RS calls for upsert select statements are disabled by default
     public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
 
+    public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
@@ -403,7 +406,8 @@ public class QueryServicesOptions {
             .setIfUnset(PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD, DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD)
             .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
             .setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED)
-            .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION);
+            .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
+            .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
         // it to 1, so we'll change it.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
new file mode 100644
index 0000000..1d4b8e0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.optimize.Cost;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Utilities for computing costs.
+ *
+ * Some of the methods here should eventually be replaced by a metadata framework which
+ * estimates output metrics for each QueryPlan or operation, e.g. row count, byte count,
+ * etc.
+ */
+public class CostUtil {
+
+    // An estimate of the ratio of result data from group-by against the input data.
+    private final static double GROUPING_FACTOR = 0.1;
+
+    // Io operations conducted in intermediate evaluations like sorting or aggregation
+    // should be counted twice since they usually involve both read and write.
+    private final static double IO_COST_MULTIPLIER = 2.0;
+
+    /**
+     * Estimate the number of output bytes of an aggregate.
+     * @param byteCount the number of input bytes
+     * @param groupBy the compiled GroupBy object
+     * @param aggregatorsSize the byte size of aggregators
+     * @return the output byte count
+     */
+    public static double estimateAggregateOutputBytes(
+            double byteCount, GroupBy groupBy, int aggregatorsSize) {
+        if (groupBy.isUngroupedAggregate()) {
+            return aggregatorsSize;
+        }
+        return byteCount * GROUPING_FACTOR;
+    }
+
+    /**
+     * Estimate the cost of an aggregate.
+     * @param byteCount the number of input bytes
+     * @param groupBy the compiled GroupBy object
+     * @param aggregatorsSize the byte size of aggregators
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
+     */
+    public static Cost estimateAggregateCost(
+            double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
+        double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
+        double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
+        return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+    }
+
+    /**
+     * Estimate the cost of an order-by
+     * @param byteCount the number of input bytes
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
+     */
+    public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
+        return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+    }
+
+    /**
+     * Estimate the parallel level of an operation
+     * @param runningOnServer if the operation will be running on server side
+     * @param services the QueryServices object
+     * @return the parallel level
+     */
+    public static int estimateParallelLevel(boolean runningOnServer, QueryServices services) {
+        // TODO currently return constants for simplicity, should derive from cluster config.
+        return runningOnServer ? 10 : 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index cb34d2b..1903dda 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
@@ -486,6 +487,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             public Long getEstimateInfoTimestamp() throws SQLException {
                 return null;
             }
+
+            @Override
+            public Cost getCost() {
+                return Cost.ZERO;
+            }
             
         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();


[8/9] phoenix git commit: PHOENIX-4585 Prune local index regions used for join queries

Posted by ma...@apache.org.
PHOENIX-4585 Prune local index regions used for join queries


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/babda325
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/babda325
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/babda325

Branch: refs/heads/4.x-HBase-0.98
Commit: babda3258921fdf4de595ba734d972860d58a0a4
Parents: 6914d54
Author: maryannxue <ma...@gmail.com>
Authored: Fri Feb 16 11:29:25 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:31:00 2018 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/JoinCompiler.java    |  37 ++--
 .../apache/phoenix/compile/QueryCompiler.java   |  60 +++---
 .../phoenix/compile/QueryCompilerTest.java      | 186 ++++++++++++++++++-
 3 files changed, 238 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f5a7e39..4020cf9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1199,7 +1199,8 @@ public class JoinCompiler {
         return AndExpression.create(expressions);
     }
 
-    public static SelectStatement optimize(PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
+    public static Pair<SelectStatement, Map<TableRef, QueryPlan>> optimize(
+            PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
         TableRef groupByTableRef = null;
         TableRef orderByTableRef = null;
         if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
@@ -1226,7 +1227,7 @@ public class JoinCompiler {
             QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null);
             List<Object> binds = statement.getParameters();
             StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
-            QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null);
+            QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null, Collections.<TableRef, QueryPlan>emptyMap());
             TableRef table = plan.getTableRef();
             if (groupByTableRef != null && !groupByTableRef.equals(table)) {
                 groupByTableRef = null;
@@ -1236,7 +1237,8 @@ public class JoinCompiler {
             }
         }
 
-        final Map<TableRef, TableRef> replacement = new HashMap<TableRef, TableRef>();
+        Map<TableRef, TableRef> replacementMap = null;
+        Map<TableRef, QueryPlan> dataPlanMap = null;
 
         for (Table table : join.getTables()) {
             if (table.isSubselect())
@@ -1245,19 +1247,30 @@ public class JoinCompiler {
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
             SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
-            // TODO: As port of PHOENIX-4585, we need to make sure this plan has a pointer to the data plan
-            // when an index is used instead of the data table, and that this method returns that
-            // state for downstream processing.
             // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call
-            QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
-            if (!plan.getTableRef().equals(tableRef)) {
-                replacement.put(tableRef, plan.getTableRef());
+            QueryPlan dataPlan =
+                    new QueryCompiler(
+                            statement, stmt,
+                            FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
+                            false, null)
+                    .compile();
+            QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, dataPlan);
+            TableRef newTableRef = plan.getTableRef();
+            if (!newTableRef.equals(tableRef)) {
+                if (replacementMap == null) {
+                    replacementMap = new HashMap<TableRef, TableRef>();
+                    dataPlanMap = new HashMap<TableRef, QueryPlan>();
+                }
+                replacementMap.put(tableRef, newTableRef);
+                dataPlanMap.put(newTableRef, dataPlan);
             }
         }
 
-        if (replacement.isEmpty())
-            return select;
+        if (replacementMap == null)
+            return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(
+                    select, Collections.<TableRef, QueryPlan> emptyMap());
 
+        final Map<TableRef, TableRef> replacement = replacementMap;
         TableNode from = select.getFrom();
         TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
             private TableRef resolveTable(String alias, TableName name) throws SQLException {
@@ -1319,7 +1332,7 @@ public class JoinCompiler {
             // replace expressions with corresponding matching columns for functional indexes
             indexSelect = ParseNodeRewriter.rewrite(indexSelect, new  IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), statement.getConnection(), indexSelect.getUdfParseNodes()));
         } 
-        return indexSelect;
+        return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(indexSelect, dataPlanMap);
     }
 
     private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index c8650b9..855b143 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -22,9 +22,9 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -184,7 +184,7 @@ public class QueryCompiler {
             select.hasWildcard() ? null : select.getSelect());
         ColumnResolver resolver = FromCompiler.getResolver(tableRef);
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
-        QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
+        QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false, null);
         plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
             plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
             context.getBindManager().getParameterMetaData());
@@ -195,15 +195,18 @@ public class QueryCompiler {
         List<Object> binds = statement.getParameters();
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
         if (select.isJoin()) {
-            select = JoinCompiler.optimize(statement, select, resolver);
-            if (this.select != select) {
-                ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+            Pair<SelectStatement, Map<TableRef, QueryPlan>> optimized =
+                    JoinCompiler.optimize(statement, select, resolver);
+            SelectStatement optimizedSelect = optimized.getFirst();
+            if (select != optimizedSelect) {
+                ColumnResolver resolver = FromCompiler.getResolverForQuery(optimizedSelect, statement.getConnection());
                 context = new StatementContext(statement, resolver, scan, sequenceManager);
             }
-            JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
-            return compileJoinQuery(context, binds, joinTable, false, false, null);
+            JoinTable joinTable = JoinCompiler.compile(statement, optimizedSelect, context.getResolver());
+            return compileJoinQuery(
+                    context, binds, joinTable, false, false, null, optimized.getSecond());
         } else {
-            return compileSingleQuery(context, select, binds, false, true);
+            return compileSingleQuery(context, select, binds, false, true, dataPlan);
         }
     }
 
@@ -216,7 +219,7 @@ public class QueryCompiler {
      *      2) Otherwise, return the join plan compiled with the default strategy.
      * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
      */
-    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
         if (joinTable.getJoinSpecs().isEmpty()) {
             Table table = joinTable.getTable();
             SelectStatement subquery = table.getAsSubquery(orderBy);
@@ -227,7 +230,8 @@ public class QueryCompiler {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), projector);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
                 table.projectColumns(context.getScan());
-                return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
+                QueryPlan dataPlan = dataPlans.get(table.getTableRef());
+                return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true, dataPlan);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = table.createProjectedTable(plan.getProjector());
@@ -239,7 +243,7 @@ public class QueryCompiler {
         assert strategies.size() > 0;
         if (!costBased || strategies.size() == 1) {
             return compileJoinQuery(
-                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
         }
 
         QueryPlan bestPlan = null;
@@ -248,7 +252,7 @@ public class QueryCompiler {
             StatementContext newContext = new StatementContext(
                     context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
             QueryPlan plan = compileJoinQuery(
-                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
             Cost cost = plan.getCost();
             if (bestPlan == null || cost.compareTo(bestCost) < 0) {
                 bestPlan = plan;
@@ -260,7 +264,7 @@ public class QueryCompiler {
         return bestPlan;
     }
 
-    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
         byte[] emptyByteArray = new byte[0];
         List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
         switch (strategy) {
@@ -303,7 +307,7 @@ public class QueryCompiler {
                     JoinSpec joinSpec = joinSpecs.get(i);
                     Scan subScan = ScanUtil.newScan(originalScan);
                     subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null, dataPlans);
                     boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                     if (hasPostReference) {
                         tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
@@ -330,7 +334,8 @@ public class QueryCompiler {
                     hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
                 }
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                QueryPlan dataPlan = dataPlans.get(tableRef);
+                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
                 Integer limit = null;
                 Integer offset = null;
@@ -350,7 +355,7 @@ public class QueryCompiler {
                 JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
                 Scan subScan = ScanUtil.newScan(originalScan);
                 StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null, dataPlans);
                 PTable rhsProjTable;
                 TableRef rhsTableRef;
                 SelectStatement rhs;
@@ -383,7 +388,8 @@ public class QueryCompiler {
                 PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
-                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                QueryPlan dataPlan = dataPlans.get(rhsTableRef);
+                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
                 Integer limit = null;
                 Integer offset = null;
@@ -420,13 +426,13 @@ public class QueryCompiler {
                 Scan lhsScan = ScanUtil.newScan(originalScan);
                 StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
                 boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
-                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy, dataPlans);
                 PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
                 boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
 
                 Scan rhsScan = ScanUtil.newScan(originalScan);
                 StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
-                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy, dataPlans);
                 PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
 
                 Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
@@ -453,7 +459,7 @@ public class QueryCompiler {
                         joinTable.getStatement().getUdfParseNodes())
                         : NODE_FACTORY.select(joinTable.getStatement(), from, where);
 
-                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder, null);
             }
             default:
                 throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -506,16 +512,16 @@ public class QueryCompiler {
         }
         int maxRows = this.statement.getMaxRows();
         this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries.
-        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, dataPlan).compile();
+        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, null).compile();
         plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
         this.statement.setMaxRows(maxRows); // restore maxRows.
         return plan;
     }
-    
-    protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
+
+    protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan dataPlan) throws SQLException{
         SelectStatement innerSelect = select.getInnerSelectStatement();
         if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
+            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true, dataPlan);
         }
         
         QueryPlan innerPlan = compileSubquery(innerSelect, false);
@@ -530,10 +536,10 @@ public class QueryCompiler {
         context.setCurrentTable(tableRef);
         boolean isInRowKeyOrder = innerPlan.getGroupBy() == GroupBy.EMPTY_GROUP_BY && innerPlan.getOrderBy() == OrderBy.EMPTY_ORDER_BY;
 
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder);
+        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder, null);
     }
-    
-    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
+
+    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder, QueryPlan dataPlan) throws SQLException{
         PTable projectedTable = null;
         if (this.projectTuples) {
             projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/babda325/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 05358d4..73cd69c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -51,12 +52,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.execute.SortMergeJoinPlan;
-import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.*;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -88,6 +85,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -4458,4 +4456,180 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         }
     }
 
+    @Test
+    public void testLocalIndexPruningInSortMergeJoin() throws SQLException {
+        verifyLocalIndexPruningWithMultipleTables("SELECT /*+ USE_SORT_MERGE_JOIN*/ *\n" +
+                "FROM T1 JOIN T2 ON T1.A = T2.A\n" +
+                "WHERE T1.A = 'B' and T1.C='C' and T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'");
+    }
+
+    @Ignore("Blocked by PHOENIX-4614")
+    @Test
+    public void testLocalIndexPruningInLeftOrInnerHashJoin() throws SQLException {
+        verifyLocalIndexPruningWithMultipleTables("SELECT *\n" +
+                "FROM T1 JOIN T2 ON T1.A = T2.A\n" +
+                "WHERE T1.A = 'B' and T1.C='C' and T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'");
+    }
+
+    @Ignore("Blocked by PHOENIX-4614")
+    @Test
+    public void testLocalIndexPruningInRightHashJoin() throws SQLException {
+        verifyLocalIndexPruningWithMultipleTables("SELECT *\n" +
+                "FROM (\n" +
+                "    SELECT A, B, C, D FROM T2 WHERE T2.A IN ('A','G') and T2.B = 'A' and T2.D = 'D'\n" +
+                ") T2\n" +
+                "RIGHT JOIN T1 ON T2.A = T1.A\n" +
+                "WHERE T1.A = 'B' and T1.C='C'");
+    }
+
+    @Test
+    public void testLocalIndexPruningInUinon() throws SQLException {
+        verifyLocalIndexPruningWithMultipleTables("SELECT A, B, C FROM T1\n" +
+                "WHERE A = 'B' and C='C'\n" +
+                "UNION ALL\n" +
+                "SELECT A, B, C FROM T2\n" +
+                "WHERE A IN ('A','G') and B = 'A' and D = 'D'");
+    }
+
+    private void verifyLocalIndexPruningWithMultipleTables(String query) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T1 (\n" +
+                    "    A CHAR(1) NOT NULL,\n" +
+                    "    B CHAR(1) NOT NULL,\n" +
+                    "    C CHAR(1) NOT NULL,\n" +
+                    "    CONSTRAINT PK PRIMARY KEY (\n" +
+                    "        A,\n" +
+                    "        B,\n" +
+                    "        C\n" +
+                    "    )\n" +
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX1 ON T1(A,C)");
+            conn.createStatement().execute("CREATE TABLE T2 (\n" +
+                    "    A CHAR(1) NOT NULL,\n" +
+                    "    B CHAR(1) NOT NULL,\n" +
+                    "    C CHAR(1) NOT NULL,\n" +
+                    "    D CHAR(1) NOT NULL,\n" +
+                    "    CONSTRAINT PK PRIMARY KEY (\n" +
+                    "        A,\n" +
+                    "        B,\n" +
+                    "        C,\n" +
+                    "        D\n" +
+                    "    )\n" +
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX2 ON T2(A,B,D)");
+            PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            List<QueryPlan> childPlans = plan.accept(new MultipleChildrenExtractor());
+            assertEquals(2, childPlans.size());
+            // Check left child
+            assertEquals("IDX1", childPlans.get(0).getContext().getCurrentTable().getTable().getName().getString());
+            childPlans.get(0).iterator();
+            List<List<Scan>> outerScansL = childPlans.get(0).getScans();
+            assertEquals(1, outerScansL.size());
+            List<Scan> innerScansL = outerScansL.get(0);
+            assertEquals(1, innerScansL.size());
+            Scan scanL = innerScansL.get(0);
+            assertEquals("A", Bytes.toString(scanL.getStartRow()).trim());
+            assertEquals("C", Bytes.toString(scanL.getStopRow()).trim());
+            // Check right child
+            assertEquals("IDX2", childPlans.get(1).getContext().getCurrentTable().getTable().getName().getString());
+            childPlans.get(1).iterator();
+            List<List<Scan>> outerScansR = childPlans.get(1).getScans();
+            assertEquals(2, outerScansR.size());
+            List<Scan> innerScansR1 = outerScansR.get(0);
+            assertEquals(1, innerScansR1.size());
+            Scan scanR1 = innerScansR1.get(0);
+            assertEquals("A", Bytes.toString(scanR1.getStartRow()).trim());
+            assertEquals("C", Bytes.toString(scanR1.getStopRow()).trim());
+            List<Scan> innerScansR2 = outerScansR.get(1);
+            assertEquals(1, innerScansR2.size());
+            Scan scanR2 = innerScansR2.get(0);
+            assertEquals("G", Bytes.toString(scanR2.getStartRow()).trim());
+            assertEquals("I", Bytes.toString(scanR2.getStopRow()).trim());
+        }
+    }
+
+    private static class MultipleChildrenExtractor implements QueryPlanVisitor<List<QueryPlan>> {
+
+        @Override
+        public List<QueryPlan> defaultReturn(QueryPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(AggregatePlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(ScanPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(ClientAggregatePlan plan) {
+            return plan.getDelegate().accept(this);
+        }
+
+        @Override
+        public List<QueryPlan> visit(ClientScanPlan plan) {
+            return plan.getDelegate().accept(this);
+        }
+
+        @Override
+        public List<QueryPlan> visit(LiteralResultIterationPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(TupleProjectionPlan plan) {
+            return plan.getDelegate().accept(this);
+        }
+
+        @Override
+        public List<QueryPlan> visit(HashJoinPlan plan) {
+            List<QueryPlan> children = new ArrayList<QueryPlan>(plan.getSubPlans().length + 1);
+            children.add(plan.getDelegate());
+            for (HashJoinPlan.SubPlan subPlan : plan.getSubPlans()) {
+                children.add(subPlan.getInnerPlan());
+            }
+            return children;
+        }
+
+        @Override
+        public List<QueryPlan> visit(SortMergeJoinPlan plan) {
+            return Lists.newArrayList(plan.getLhsPlan(), plan.getRhsPlan());
+        }
+
+        @Override
+        public List<QueryPlan> visit(UnionPlan plan) {
+            return plan.getSubPlans();
+        }
+
+        @Override
+        public List<QueryPlan> visit(UnnestArrayPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(CorrelatePlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(CursorFetchPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(ListJarsQueryPlan plan) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<QueryPlan> visit(TraceQueryPlan plan) {
+            return Collections.emptyList();
+        }
+    }
 }


[6/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge join decision on cost

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index a15ab35..21cbc2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -35,6 +35,10 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.aggregator.Aggregators;
@@ -90,25 +94,30 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
-
-        if (byteCount == null) {
+        Double outputBytes = this.accept(new ByteCountVisitor());
+        Double inputRows = this.getDelegate().accept(new RowCountVisitor());
+        Double rowWidth = this.accept(new AvgRowWidthVisitor());
+        if (inputRows == null || outputBytes == null || rowWidth == null) {
             return Cost.UNKNOWN;
         }
+        double inputBytes = inputRows * rowWidth;
+        double rowsBeforeHaving = RowCountVisitor.aggregate(
+                RowCountVisitor.filter(
+                        inputRows.doubleValue(),
+                        RowCountVisitor.stripSkipScanFilter(
+                                context.getScan().getFilter())),
+                groupBy);
+        double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+        double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+        double bytesAfterHaving = rowWidth * rowsAfterHaving;
 
         int parallelLevel = CostUtil.estimateParallelLevel(
                 false, context.getConnection().getQueryServices());
-        Cost cost = CostUtil.estimateAggregateCost(byteCount,
-                groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+        Cost cost = CostUtil.estimateAggregateCost(
+                inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            double outputBytes = CostUtil.estimateAggregateOutputBytes(
-                    byteCount, groupBy, clientAggregators.getEstimatedByteSize());
-            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            Cost orderByCost = CostUtil.estimateOrderByCost(
+                    bytesAfterHaving, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return super.getCost().plus(cost);
@@ -210,7 +219,16 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     public GroupBy getGroupBy() {
         return groupBy;
     }
-    
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    public Expression getHaving() {
+        return having;
+    }
+
     private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
         private final List<Expression> groupByExpressions;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index ac43919..75ba8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -85,4 +85,8 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
     public FilterableStatement getStatement() {
         return statement;
     }
+
+    public Expression getWhere() {
+        return where;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 5799990..3427f5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -53,28 +55,30 @@ public class ClientScanPlan extends ClientProcessingPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double inputBytes = this.getDelegate().accept(new ByteCountVisitor());
+        Double outputBytes = this.accept(new ByteCountVisitor());
 
-        if (byteCount == null) {
+        if (inputBytes == null || outputBytes == null) {
             return Cost.UNKNOWN;
         }
 
-        Cost cost = new Cost(0, 0, byteCount);
         int parallelLevel = CostUtil.estimateParallelLevel(
                 false, context.getConnection().getQueryServices());
+        Cost cost = new Cost(0, 0, 0);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            Cost orderByCost =
+                    CostUtil.estimateOrderByCost(inputBytes, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return super.getCost().plus(cost);
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 270ad3d..e3e0264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.optimize.Cost;
@@ -202,19 +205,18 @@ public class CorrelatePlan extends DelegateQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    public QueryPlan getRhsPlan() {
+        return rhs;
+    }
+
+    @Override
     public Cost getCost() {
-        Long lhsByteCount = null;
-        try {
-            lhsByteCount = delegate.getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
-        Long rhsRowCount = null;
-        try {
-            rhsRowCount = rhs.getEstimatedRowsToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double lhsByteCount = delegate.accept(new ByteCountVisitor());
+        Double rhsRowCount = rhs.accept(new RowCountVisitor());
 
         if (lhsByteCount == null || rhsRowCount == null) {
             return Cost.UNKNOWN;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
index cf0a3cf..0ecf74d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.CursorResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -51,6 +52,11 @@ public class CursorFetchPlan extends DelegateQueryPlan {
 	    return resultIterator;
 	}
 
+	@Override
+	public <T> T accept(QueryPlanVisitor<T> visitor) {
+		return visitor.visit(this);
+	}
+
 
 	@Override
 	public ExplainPlan getExplainPlan() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 23a0da6..6ade42e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -48,6 +48,9 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -63,10 +66,7 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.ParseNode;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.*;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
@@ -92,6 +93,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
+    private final long serverCacheLimit;
     private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
@@ -132,8 +134,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         for (SubPlan subPlan : subPlans) {
             tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
         }
-        this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
+        QueryServices services = plan.getContext().getConnection().getQueryServices();
+        this.maxServerCacheTimeToLive = services.getProps().getInt(
                 QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+        this.serverCacheLimit = services.getProps().getLong(
+                QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
     }
     
     @Override
@@ -270,40 +275,101 @@ public class HashJoinPlan extends DelegateQueryPlan {
         return statement;
     }
 
+    public HashJoinInfo getJoinInfo() {
+        return joinInfo;
+    }
+
+    public SubPlan[] getSubPlans() {
+        return subPlans;
+    }
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     @Override
     public Cost getCost() {
-        Long byteCount = null;
         try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+            Long r = delegate.getEstimatedRowsToScan();
+            Double w = delegate.accept(new AvgRowWidthVisitor());
+            if (r == null || w == null) {
+                return Cost.UNKNOWN;
+            }
 
-        if (byteCount == null) {
-            return Cost.UNKNOWN;
-        }
+            int parallelLevel = CostUtil.estimateParallelLevel(
+                    true, getContext().getConnection().getQueryServices());
+
+            double rowWidth = w;
+            double rows = RowCountVisitor.filter(
+                    r.doubleValue(),
+                    RowCountVisitor.stripSkipScanFilter(
+                            delegate.getContext().getScan().getFilter()));
+            double bytes = rowWidth * rows;
+            Cost cost = Cost.ZERO;
+            double rhsByteSum = 0.0;
+            for (int i = 0; i < subPlans.length; i++) {
+                double lhsBytes = bytes;
+                Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor());
+                Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor());
+                if (rhsRows == null || rhsWidth == null) {
+                    return Cost.UNKNOWN;
+                }
+                double rhsBytes = rhsWidth * rhsRows;
+                rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]);
+                rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]);
+                bytes = rowWidth * rows;
+                cost = cost.plus(CostUtil.estimateHashJoinCost(
+                        lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel));
+                rhsByteSum += rhsBytes;
+            }
 
-        Cost cost = new Cost(0, 0, byteCount);
-        Cost lhsCost = delegate.getCost();
-        if (keyRangeExpressions != null) {
-            // The selectivity of the dynamic rowkey filter.
-            // TODO replace the constant with an estimate value.
-            double selectivity = 0.01;
-            lhsCost = lhsCost.multiplyBy(selectivity);
-        }
-        Cost rhsCost = Cost.ZERO;
-        for (SubPlan subPlan : subPlans) {
-            rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+            if (rhsByteSum > serverCacheLimit) {
+                return Cost.UNKNOWN;
+            }
+
+            // Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan
+            if (delegate instanceof AggregatePlan) {
+                AggregatePlan aggPlan = (AggregatePlan) delegate;
+                double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy());
+                double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving());
+                double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+                double bytesAfterHaving = rowWidth * rowsAfterHaving;
+                Cost aggCost = CostUtil.estimateAggregateCost(
+                        bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel);
+                cost = cost.plus(aggCost);
+                rows = rowsAfterHaving;
+                bytes = bytesAfterHaving;
+            }
+            double outputRows = RowCountVisitor.limit(rows, delegate.getLimit());
+            double outputBytes = rowWidth * outputRows;
+            if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) {
+                int parallelLevel2 = CostUtil.estimateParallelLevel(
+                        delegate instanceof ScanPlan, getContext().getConnection().getQueryServices());
+                Cost orderByCost = CostUtil.estimateOrderByCost(
+                        bytes, outputBytes, parallelLevel);
+                cost = cost.plus(orderByCost);
+            }
+
+            // Calculate the cost of child nodes
+            Cost lhsCost = new Cost(0, 0, r.doubleValue() * w);
+            Cost rhsCost = Cost.ZERO;
+            for (SubPlan subPlan : subPlans) {
+                rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+            }
+            return cost.plus(lhsCost).plus(rhsCost);
+        } catch (SQLException e) {
         }
-        return cost.plus(lhsCost).plus(rhsCost);
+        return Cost.UNKNOWN;
     }
 
-    protected interface SubPlan {
+    public interface SubPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException;
         public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
         public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
         public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
         public QueryPlan getInnerPlan();
+        public boolean hasKeyRangeExpression();
     }
     
     public static class WhereClauseSubPlan implements SubPlan {
@@ -383,6 +449,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public QueryPlan getInnerPlan() {
             return plan;
         }
+
+        @Override
+        public boolean hasKeyRangeExpression() {
+            return false;
+        }
     }
     
     public static class HashSubPlan implements SubPlan {        
@@ -495,6 +566,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public QueryPlan getInnerPlan() {
             return plan;
         }
+
+        @Override
+        public boolean hasKeyRangeExpression() {
+            return keyRangeLhsExpression != null;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index c9abb69..255fca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -81,6 +82,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d63950c..ed145a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -37,6 +37,8 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.BaseResultIterators;
@@ -202,16 +204,17 @@ public class ScanPlan extends BaseQueryPlan {
         } catch (SQLException e) {
             // ignored.
         }
+        Double outputBytes = this.accept(new ByteCountVisitor());
 
-        if (byteCount == null) {
+        if (byteCount == null || outputBytes == null) {
             return Cost.UNKNOWN;
         }
 
-        Cost cost = new Cost(0, 0, byteCount);
         int parallelLevel = CostUtil.estimateParallelLevel(
                 true, context.getConnection().getQueryServices());
+        Cost cost = new Cost(0, 0, byteCount);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return cost;
@@ -320,6 +323,11 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Long getEstimatedRowsToScan() throws SQLException {
         if (isSerial) {
             return serialRowsEstimate;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 2436d1e..978c7b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -47,6 +47,8 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.MappedByteBufferQueue;
@@ -171,12 +173,7 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double byteCount = this.accept(new ByteCountVisitor());
 
         if (byteCount == null) {
             return Cost.UNKNOWN;
@@ -255,7 +252,11 @@ public class SortMergeJoinPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return false;
     }
-    
+
+    public JoinType getJoinType() {
+        return type;
+    }
+
     private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) {
         SQLException e = null;
         try {
@@ -717,6 +718,11 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Set<TableRef> getSourceRefs() {
         return tableRefs;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index f42af56..f869a4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
@@ -78,4 +79,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
         
         return iterator;
     }
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 3b5168c..6114d66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -112,6 +113,10 @@ public class UnionPlan implements QueryPlan {
         return iterators.getScans();
     }
 
+    public List<QueryPlan> getSubPlans() {
+        return plans;
+    }
+
     @Override
     public GroupBy getGroupBy() {
         return groupBy;
@@ -230,7 +235,12 @@ public class UnionPlan implements QueryPlan {
         return false;
     }
 
-	@Override
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
 	public Operation getOperation() {
 		return statement.getOperation();
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 51cb67e..0bc3df4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
 import org.apache.phoenix.expression.Expression;
@@ -64,6 +65,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
         return null;
     }
 
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     public class UnnestArrayResultIterator extends DelegateResultIterator {
         private final UnnestArrayElemRefExpression elemRefExpression;
         private final UnnestArrayElemIndexExpression elemIndexExpression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
new file mode 100644
index 0000000..9525747
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the average number of bytes each
+ * row for a QueryPlan.
+ */
+public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        try {
+            Long byteCount = plan.getEstimatedBytesToScan();
+            Long rowCount = plan.getEstimatedRowsToScan();
+            if (byteCount != null && rowCount != null) {
+                if (byteCount == 0) {
+                    return 0.0;
+                }
+                if (rowCount != 0) {
+                    return ((double) byteCount) / rowCount;
+                }
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        try {
+            Long byteCount = plan.getEstimatedBytesToScan();
+            Long rowCount = plan.getEstimatedRowsToScan();
+            if (byteCount != null && rowCount != null) {
+                if (byteCount == 0) {
+                    return 0.0;
+                }
+                if (rowCount != 0) {
+                    return ((double) byteCount) / rowCount;
+                }
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        Double lhsWidth = plan.getDelegate().accept(this);
+        if (lhsWidth == null) {
+            return null;
+        }
+        JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+        HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+        Double width = lhsWidth;
+        for (int i = 0; i < joinTypes.length; i++) {
+            Double rhsWidth = subPlans[i].getInnerPlan().accept(this);
+            if (rhsWidth == null) {
+                return null;
+            }
+            width = join(width, rhsWidth, joinTypes[i]);
+        }
+
+        return width;
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        Double lhsWidth = plan.getLhsPlan().accept(this);
+        Double rhsWidth = plan.getRhsPlan().accept(this);
+        if (lhsWidth == null || rhsWidth == null) {
+            return null;
+        }
+
+        return join(lhsWidth, rhsWidth, plan.getJoinType());
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        Double sum = 0.0;
+        for (QueryPlan subPlan : plan.getSubPlans()) {
+            Double avgWidth = subPlan.accept(this);
+            if (avgWidth == null) {
+                return null;
+            }
+            sum += avgWidth;
+        }
+
+        return sum / plan.getSubPlans().size();
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+
+    /*
+     * The below methods provide estimation of row width based on the input row width as well as
+     * the operator.
+     */
+
+    public static double join(double lhsWidth, double rhsWidth, JoinTableNode.JoinType type) {
+        double width;
+        switch (type) {
+            case Inner:
+            case Left:
+            case Right:
+            case Full: {
+                width = lhsWidth + rhsWidth;
+                break;
+            }
+            case Semi:
+            case Anti: {
+                width = lhsWidth;
+                break;
+            }
+            default: {
+                throw new IllegalArgumentException("Invalid join type: " + type);
+            }
+        }
+        return width;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
new file mode 100644
index 0000000..61a2895
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output bytes for a QueryPlan.
+ */
+public class ByteCountVisitor implements QueryPlanVisitor<Double> {
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    protected Double getByteCountFromRowCountAndRowWidth(QueryPlan plan) {
+        Double rowCount = plan.accept(new RowCountVisitor());
+        Double rowWidth = plan.accept(new AvgRowWidthVisitor());
+        if (rowCount == null || rowWidth == null) {
+            return null;
+        }
+
+        return rowCount * rowWidth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
new file mode 100644
index 0000000..a7ae3af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.*;
+
+/**
+ *
+ * Visitor for a QueryPlan (which may contain other nested query-plans)
+ *
+ */
+public interface QueryPlanVisitor<E> {
+    E defaultReturn(QueryPlan plan);
+    E visit(AggregatePlan plan);
+    E visit(ScanPlan plan);
+    E visit(ClientAggregatePlan plan);
+    E visit(ClientScanPlan plan);
+    E visit(LiteralResultIterationPlan plan);
+    E visit(TupleProjectionPlan plan);
+    E visit(HashJoinPlan plan);
+    E visit(SortMergeJoinPlan plan);
+    E visit(UnionPlan plan);
+    E visit(UnnestArrayPlan plan);
+    E visit(CorrelatePlan plan);
+    E visit(CursorFetchPlan plan);
+    E visit(ListJarsQueryPlan plan);
+    E visit(TraceQueryPlan plan);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
new file mode 100644
index 0000000..58ceea9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.phoenix.compile.GroupByCompiler;
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output rows for a QueryPlan.
+ */
+public class RowCountVisitor implements QueryPlanVisitor<Double> {
+
+    // An estimate of the ratio of result data from group-by against the input data.
+    private final static double GROUPING_FACTOR = 0.1;
+
+    private final static double OUTER_JOIN_FACTOR = 1.15;
+    private final static double INNER_JOIN_FACTOR = 0.85;
+    private final static double SEMI_OR_ANTI_JOIN_FACTOR = 0.5;
+
+    private final static double UNION_DISTINCT_FACTOR = 0.8;
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        try {
+            Long b = plan.getEstimatedRowsToScan();
+            if (b != null) {
+                return limit(
+                        filter(
+                                aggregate(
+                                        filter(
+                                                b.doubleValue(),
+                                                stripSkipScanFilter(
+                                                        plan.getContext().getScan().getFilter())),
+                                        plan.getGroupBy()),
+                                plan.getHaving()),
+                        plan.getLimit());
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        try {
+            Long b = plan.getEstimatedRowsToScan();
+            if (b != null) {
+                return limit(
+                        filter(
+                                b.doubleValue(),
+                                stripSkipScanFilter(plan.getContext().getScan().getFilter())),
+                        plan.getLimit());
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        Double b = plan.getDelegate().accept(this);
+        if (b != null) {
+            return limit(
+                    filter(
+                            aggregate(
+                                    filter(b.doubleValue(), plan.getWhere()),
+                                    plan.getGroupBy()),
+                            plan.getHaving()),
+                    plan.getLimit());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        if (plan.getLimit() != null) {
+            return (double) plan.getLimit();
+        }
+        Double b = plan.getDelegate().accept(this);
+        if (b != null) {
+            return limit(
+                    filter(b.doubleValue(), plan.getWhere()),
+                    plan.getLimit());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return 1.0;
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        try {
+            QueryPlan lhsPlan = plan.getDelegate();
+            Long b = lhsPlan.getEstimatedRowsToScan();
+            if (b == null) {
+                return null;
+            }
+
+            Double rows = filter(b.doubleValue(),
+                    stripSkipScanFilter(lhsPlan.getContext().getScan().getFilter()));
+            JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+            HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+            for (int i = 0; i < joinTypes.length; i++) {
+                Double rhsRows = subPlans[i].getInnerPlan().accept(this);
+                if (rhsRows == null) {
+                    return null;
+                }
+                rows = join(rows, rhsRows.doubleValue(), joinTypes[i]);
+            }
+            if (lhsPlan instanceof AggregatePlan) {
+                AggregatePlan aggPlan = (AggregatePlan) lhsPlan;
+                rows = filter(aggregate(rows, aggPlan.getGroupBy()), aggPlan.getHaving());
+            }
+            return limit(rows, lhsPlan.getLimit());
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        Double lhsRows = plan.getLhsPlan().accept(this);
+        Double rhsRows = plan.getRhsPlan().accept(this);
+        if (lhsRows != null && rhsRows != null) {
+            return join(lhsRows, rhsRows, plan.getJoinType());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        int count = plan.getSubPlans().size();
+        double[] inputRows = new double[count];
+        for (int i = 0; i < count; i++) {
+            Double b = plan.getSubPlans().get(i).accept(this);
+            if (b != null) {
+                inputRows[i] = b.doubleValue();
+            } else {
+                return null;
+            }
+        }
+
+        return limit(union(true, inputRows),plan.getLimit());
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        Double lhsRows = plan.getDelegate().accept(this);
+        if (lhsRows != null) {
+            return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return 0.0;
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return 0.0;
+    }
+
+    public static Filter stripSkipScanFilter(Filter filter) {
+        if (filter == null) {
+            return null;
+        }
+        if (!(filter instanceof FilterList)) {
+            return filter instanceof BooleanExpressionFilter ? filter : null;
+        }
+        FilterList filterList = (FilterList) filter;
+        if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) {
+            return filter;
+        }
+        List<Filter> list = new ArrayList<>();
+        for (Filter f : filterList.getFilters()) {
+            Filter stripped = stripSkipScanFilter(f);
+            if (stripped != null) {
+                list.add(stripped);
+            }
+        }
+        return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list));
+    }
+
+
+    /*
+     * The below methods provide estimation of row count based on the input row count as well as
+     * the operator. They should be replaced by more accurate calculation based on histogram and
+     * a logical operator layer is expect to facilitate this.
+     */
+
+    public static double filter(double inputRows, Filter filter) {
+        if (filter == null) {
+            return inputRows;
+        }
+        return 0.5 * inputRows;
+    }
+
+    public static double filter(double inputRows, Expression filter) {
+        if (filter == null) {
+            return inputRows;
+        }
+        return 0.5 * inputRows;
+    }
+
+    public static double aggregate(double inputRows, GroupByCompiler.GroupBy groupBy) {
+        if (groupBy.isUngroupedAggregate()) {
+            return 1.0;
+        }
+        return GROUPING_FACTOR * inputRows;
+    }
+
+    public static double limit(double inputRows, Integer limit) {
+        if (limit == null) {
+            return inputRows;
+        }
+        return limit;
+    }
+
+    public static double join(double lhsRows, double[] rhsRows, JoinTableNode.JoinType[] types) {
+        assert rhsRows.length == types.length;
+        double rows = lhsRows;
+        for (int i = 0; i < rhsRows.length; i++) {
+            rows = join(rows, rhsRows[i], types[i]);
+        }
+        return rows;
+    }
+
+    public static double join(double lhsRows, double rhsRows, JoinTableNode.JoinType type) {
+        double rows;
+        switch (type) {
+            case Inner: {
+                rows = Math.min(lhsRows, rhsRows);
+                rows = rows * INNER_JOIN_FACTOR;
+                break;
+            }
+            case Left:
+            case Right:
+            case Full: {
+                rows = Math.max(lhsRows, rhsRows);
+                rows = rows * OUTER_JOIN_FACTOR;
+                break;
+            }
+            case Semi:
+            case Anti: {
+                rows = lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+                break;
+            }
+            default: {
+                throw new IllegalArgumentException("Invalid join type: " + type);
+            }
+        }
+        return rows;
+    }
+
+    public static double union(boolean all, double... inputRows) {
+        double rows = 0.0;
+        for (double d : inputRows) {
+            rows += d;
+        }
+        if (!all) {
+            rows *= UNION_DISTINCT_FACTOR;
+        }
+        return rows;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index da8beae..a55af6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
@@ -731,6 +732,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }
 
                 @Override
+                public <T> T accept(QueryPlanVisitor<T> visitor) {
+                    return visitor.defaultReturn(this);
+                }
+
+                @Override
                 public Long getEstimatedRowsToScan() {
                     return estimatedRows;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
index 1d4b8e0..db2b5ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -30,51 +30,52 @@ import org.apache.phoenix.query.QueryServices;
  */
 public class CostUtil {
 
-    // An estimate of the ratio of result data from group-by against the input data.
-    private final static double GROUPING_FACTOR = 0.1;
-
-    // Io operations conducted in intermediate evaluations like sorting or aggregation
-    // should be counted twice since they usually involve both read and write.
-    private final static double IO_COST_MULTIPLIER = 2.0;
-
     /**
-     * Estimate the number of output bytes of an aggregate.
-     * @param byteCount the number of input bytes
+     * Estimate the cost of an aggregate.
+     * @param inputBytes the number of input bytes
+     * @param outputBytes the number of output bytes
      * @param groupBy the compiled GroupBy object
-     * @param aggregatorsSize the byte size of aggregators
-     * @return the output byte count
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
      */
-    public static double estimateAggregateOutputBytes(
-            double byteCount, GroupBy groupBy, int aggregatorsSize) {
-        if (groupBy.isUngroupedAggregate()) {
-            return aggregatorsSize;
-        }
-        return byteCount * GROUPING_FACTOR;
+    public static Cost estimateAggregateCost(
+            double inputBytes, double outputBytes, GroupBy groupBy, int parallelLevel) {
+        double hashMapOverhead = groupBy.isOrderPreserving() || groupBy.isUngroupedAggregate() ? 1 : (outputBytes < 1 ? 1 : outputBytes);
+        return new Cost(0, 0, (outputBytes + hashMapOverhead * Math.log(inputBytes)) / parallelLevel);
     }
 
     /**
-     * Estimate the cost of an aggregate.
-     * @param byteCount the number of input bytes
-     * @param groupBy the compiled GroupBy object
-     * @param aggregatorsSize the byte size of aggregators
+     * Estimate the cost of an order-by
+     * @param inputBytes the number of input bytes
+     * @param outputBytes the number of output bytes, which may be different from inputBytes
+     *                    depending on whether there is a LIMIT
      * @param parallelLevel number of parallel workers or threads
      * @return the cost
      */
-    public static Cost estimateAggregateCost(
-            double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
-        double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
-        double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
-        return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+    public static Cost estimateOrderByCost(double inputBytes, double outputBytes, int parallelLevel) {
+        if (inputBytes < 1) {
+            inputBytes = 1;
+        }
+        return new Cost(0, 0,
+                (outputBytes + outputBytes * Math.log(inputBytes)) / parallelLevel);
     }
 
     /**
-     * Estimate the cost of an order-by
-     * @param byteCount the number of input bytes
+     * Estimate the cost of a hash-join
+     * @param lhsBytes the number of left input bytes
+     * @param rhsBytes the number of right input bytes
+     * @param outputBytes the number of output bytes
      * @param parallelLevel number of parallel workers or threads
      * @return the cost
      */
-    public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
-        return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+    public static Cost estimateHashJoinCost(
+            double lhsBytes, double rhsBytes, double outputBytes,
+            boolean hasKeyRangeExpression, int parallelLevel) {
+        if (rhsBytes < 1) {
+            rhsBytes = 1;
+        }
+        return new Cost(0, 0,
+                (rhsBytes * Math.log(rhsBytes) + (hasKeyRangeExpression ? 0 : lhsBytes)) / parallelLevel + outputBytes);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1903dda..69aeaad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.iterate.ParallelIterators;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -474,6 +475,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             }
 
             @Override
+            public <T> T accept(QueryPlanVisitor<T> visitor) {
+                return visitor.defaultReturn(this);
+            }
+
+            @Override
             public Long getEstimatedRowsToScan() {
                 return null;
             }


[9/9] phoenix git commit: PHOENIX-4611 Not nullable column impact on join query plans

Posted by ma...@apache.org.
PHOENIX-4611 Not nullable column impact on join query plans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9bb7811f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9bb7811f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9bb7811f

Branch: refs/heads/4.x-HBase-0.98
Commit: 9bb7811f001d00cea42da6185c3645d7d14e4a16
Parents: babda32
Author: maryannxue <ma...@gmail.com>
Authored: Fri Feb 16 21:03:46 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:31:58 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java | 8 ++++----
 .../java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java | 4 ++--
 .../main/java/org/apache/phoenix/compile/JoinCompiler.java   | 4 ++++
 3 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
index a132728..dea349a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -440,8 +440,8 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                         "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "    CLIENT MERGE SORT\n" +
                         "    CLIENT SORTED BY [BUCKET, \"TIMESTAMP\"]\n" +
-                        "CLIENT SORTED BY [E.BUCKET, E.TIMESTAMP]\n" +
-                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]"
+                        "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
+                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]"
                         :
                         "SORT-MERGE-JOIN (INNER) TABLES\n" +
                         "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + eventCountTableName + " [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
@@ -456,8 +456,8 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                         "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "    CLIENT MERGE SORT\n" +
-                        "CLIENT SORTED BY [E.BUCKET, E.TIMESTAMP]\n" +
-                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]";
+                        "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
+                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]";
                 
                 ResultSet rs = conn.createStatement().executeQuery("explain " + q);
                 assertEquals(p, QueryUtil.getExplainPlan(rs));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index f09f1d3..3a1b015 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -786,7 +786,7 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
                 String p = i == 0 ?
                         "CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER EVENT_COUNT [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
                         "    SERVER FILTER BY FIRST KEY ONLY\n" +
-                        "    SERVER AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, \"E.TIMESTAMP\"]\n" +
+                        "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"E.TIMESTAMP\", E.BUCKET]\n" +
                         "CLIENT MERGE SORT\n" +
                         "    PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
                         "        CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
@@ -795,7 +795,7 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
                         :
                         "CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER EVENT_COUNT [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
                         "    SERVER FILTER BY FIRST KEY ONLY\n" +
-                        "    SERVER AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, \"E.TIMESTAMP\"]\n" +
+                        "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"E.TIMESTAMP\", E.BUCKET]\n" +
                         "CLIENT MERGE SORT\n" +
                         "    PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
                         "        CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [0,'5SEC',1462993420000000001,'Tr/Bal'] - [1,'5SEC',1462993520000000000,'Tr/Bal']\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bb7811f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 4020cf9..cf5a5dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -561,6 +561,10 @@ public class JoinCompiler {
                 }
                 compiled.add(new Pair<Expression, Expression>(left, right));
             }
+            // TODO PHOENIX-4618:
+            // For Stategy.SORT_MERGE, we probably need to re-order the join keys based on the
+            // specific ordering required by the join's parent, or re-order the following way
+            // to align with group-by expressions' re-ordering.
             if (strategy != Strategy.SORT_MERGE) {
                 Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
                     @Override


[4/9] phoenix git commit: PHOENIX-4437 Make QueryPlan.getEstimatedBytesToScan() independent of getExplainPlan() and pull optimize() out of getExplainPlan()

Posted by ma...@apache.org.
PHOENIX-4437 Make QueryPlan.getEstimatedBytesToScan() independent of getExplainPlan() and pull optimize() out of getExplainPlan()


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7ef96fe1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7ef96fe1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7ef96fe1

Branch: refs/heads/4.x-HBase-0.98
Commit: 7ef96fe1bed43f3ac3dae900a3e6a83791faf697
Parents: 977699a
Author: maryannxue <ma...@gmail.com>
Authored: Thu Dec 21 10:31:04 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 20:08:24 2018 -0700

----------------------------------------------------------------------
 .../end2end/ExplainPlanWithStatsEnabledIT.java  |  4 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 45 ++++++--------
 .../apache/phoenix/execute/HashJoinPlan.java    | 59 +++++++++---------
 .../phoenix/execute/SortMergeJoinPlan.java      | 63 ++++++++++----------
 .../org/apache/phoenix/execute/UnionPlan.java   | 53 ++++++++--------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  9 ++-
 6 files changed, 120 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 76ed7ba..a835e84 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -303,8 +303,8 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(false);
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 200L, info.estimatedBytes);
-            assertEquals((Long) 2L, info.estimatedRows);
+            assertEquals((Long) 176l, info.estimatedBytes);
+            assertEquals((Long) 2l, info.estimatedRows);
             assertTrue(info.estimateInfoTs > 0);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 38ed926..c4edf31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -118,7 +118,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
     protected Long estimatedRows;
     protected Long estimatedSize;
     protected Long estimateInfoTimestamp;
-    private boolean explainPlanCalled;
+    private boolean getEstimatesCalled;
     
 
     protected BaseQueryPlan(
@@ -500,32 +500,17 @@ public abstract class BaseQueryPlan implements QueryPlan {
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        explainPlanCalled = true;
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
         }
 
-        // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
-        // get the stats for computing costs.
-        boolean costBased =
-                context.getConnection().getQueryServices().getConfiguration().getBoolean(
-                        QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
-        if (costBased) {
-            ResultIterator iterator = iterator();
-            iterator.close();
-        }
-        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
-        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
-        ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
-        if (!costBased) { // do not override estimates if they are used for cost calculation.
-            this.estimatedRows = plan.getEstimatedRowsToScan();
-            this.estimatedSize = plan.getEstimatedBytesToScan();
-            this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
-        }
-        return exp;
+        ResultIterator iterator = iterator();
+        ExplainPlan explainPlan = new ExplainPlan(getPlanSteps(iterator));
+        iterator.close();
+        return explainPlan;
     }
 
-    private List<String> getPlanSteps(ResultIterator iterator){
+    private List<String> getPlanSteps(ResultIterator iterator) {
         List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
         iterator.explain(planSteps);
         return planSteps;
@@ -538,26 +523,32 @@ public abstract class BaseQueryPlan implements QueryPlan {
     
     @Override
     public Long getEstimatedRowsToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedRows;
     }
 
     @Override
     public Long getEstimatedBytesToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedSize;
     }
 
     @Override
     public Long getEstimateInfoTimestamp() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimateInfoTimestamp;
     }
 
+    private void getEstimates() throws SQLException {
+        getEstimatesCalled = true;
+        // Initialize a dummy iterator to get the estimates based on stats.
+        ResultIterator iterator = iterator();
+        iterator.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2d2ff4e..23a0da6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -99,7 +99,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private Long estimatedRows;
     private Long estimatedBytes;
     private Long estimateInfoTs;
-    private boolean explainPlanCalled;
+    private boolean getEstimatesCalled;
     
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
@@ -247,7 +247,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        explainPlanCalled = true;
         List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
         int count = subPlans.length;
         for (int i = 0; i < count; i++) {
@@ -263,26 +262,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
         if (joinInfo != null && joinInfo.getLimit() != null) {
             planSteps.add("    JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
         }
-        for (SubPlan subPlan : subPlans) {
-            if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
-                    || subPlan.getInnerPlan().getEstimatedRowsToScan() == null
-                    || subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
-                /*
-                 * If any of the sub plans doesn't have the estimate info available, then we don't
-                 * provide estimate for the overall plan
-                 */
-                estimatedBytes = null;
-                estimatedRows = null;
-                estimateInfoTs = null;
-                break;
-            } else {
-                estimatedBytes =
-                        add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
-                estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
-                estimateInfoTs =
-                        getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
-            }
-        }
         return new ExplainPlan(planSteps);
     }
 
@@ -520,27 +499,51 @@ public class HashJoinPlan extends DelegateQueryPlan {
 
     @Override
     public Long getEstimatedRowsToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedRows;
     }
 
     @Override
     public Long getEstimatedBytesToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedBytes;
     }
 
     @Override
     public Long getEstimateInfoTimestamp() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimateInfoTs;
     }
+
+    private void getEstimates() throws SQLException {
+        getEstimatesCalled = true;
+        for (SubPlan subPlan : subPlans) {
+            if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
+                    || subPlan.getInnerPlan().getEstimatedRowsToScan() == null
+                    || subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
+                /*
+                 * If any of the sub plans doesn't have the estimate info available, then we don't
+                 * provide estimate for the overall plan
+                 */
+                estimatedBytes = null;
+                estimatedRows = null;
+                estimateInfoTs = null;
+                break;
+            } else {
+                estimatedBytes =
+                        add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
+                estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
+                estimateInfoTs =
+                        getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
+            }
+        }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 3e380da..2436d1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -95,7 +95,7 @@ public class SortMergeJoinPlan implements QueryPlan {
     private Long estimatedBytes;
     private Long estimatedRows;
     private Long estimateInfoTs;
-    private boolean explainPlanCalled;
+    private boolean getEstimatesCalled;
 
     public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, 
             JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
@@ -157,7 +157,6 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        explainPlanCalled = true;
         List<String> steps = Lists.newArrayList();
         steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES");
         for (String step : lhsPlan.getExplainPlan().getPlanSteps()) {
@@ -167,28 +166,6 @@ public class SortMergeJoinPlan implements QueryPlan {
         for (String step : rhsPlan.getExplainPlan().getPlanSteps()) {
             steps.add("    " + step);            
         }
-        if ((lhsPlan.getEstimatedBytesToScan() == null || rhsPlan.getEstimatedBytesToScan() == null)
-                || (lhsPlan.getEstimatedRowsToScan() == null
-                        || rhsPlan.getEstimatedRowsToScan() == null)
-                || (lhsPlan.getEstimateInfoTimestamp() == null
-                        || rhsPlan.getEstimateInfoTimestamp() == null)) {
-            /*
-             * If any of the sub plans doesn't have the estimate info available, then we don't
-             * provide estimate for the overall plan
-             */
-            estimatedBytes = null;
-            estimatedRows = null;
-            estimateInfoTs = null;
-        } else {
-            estimatedBytes =
-                    add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()),
-                        rhsPlan.getEstimatedBytesToScan());
-            estimatedRows =
-                    add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()),
-                        rhsPlan.getEstimatedRowsToScan());
-            estimateInfoTs =
-                    getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
-        }
         return new ExplainPlan(steps);
     }
 
@@ -754,25 +731,51 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public Long getEstimatedRowsToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedRows;
     }
 
     @Override
     public Long getEstimatedBytesToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedBytes;
     }
 
     @Override
     public Long getEstimateInfoTimestamp() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimateInfoTs;
     }
+
+    private void getEstimates() throws SQLException {
+        getEstimatesCalled = true;
+        if ((lhsPlan.getEstimatedBytesToScan() == null || rhsPlan.getEstimatedBytesToScan() == null)
+                || (lhsPlan.getEstimatedRowsToScan() == null
+                || rhsPlan.getEstimatedRowsToScan() == null)
+                || (lhsPlan.getEstimateInfoTimestamp() == null
+                || rhsPlan.getEstimateInfoTimestamp() == null)) {
+            /*
+             * If any of the sub plans doesn't have the estimate info available, then we don't
+             * provide estimate for the overall plan
+             */
+            estimatedBytes = null;
+            estimatedRows = null;
+            estimateInfoTs = null;
+        } else {
+            estimatedBytes =
+                    add(add(estimatedBytes, lhsPlan.getEstimatedBytesToScan()),
+                            rhsPlan.getEstimatedBytesToScan());
+            estimatedRows =
+                    add(add(estimatedRows, lhsPlan.getEstimatedRowsToScan()),
+                            rhsPlan.getEstimatedRowsToScan());
+            estimateInfoTs =
+                    getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e6bf654..3b5168c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -69,7 +69,7 @@ public class UnionPlan implements QueryPlan {
     private Long estimatedRows;
     private Long estimatedBytes;
     private Long estimateInfoTs;
-    private boolean explainPlanCalled;
+    private boolean getEstimatesCalled;
 
     public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
             Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException {
@@ -174,7 +174,6 @@ public class UnionPlan implements QueryPlan {
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        explainPlanCalled = true;
         List<String> steps = new ArrayList<String>();
         steps.add("UNION ALL OVER " + this.plans.size() + " QUERIES");
         ResultIterator iterator = iterator();
@@ -184,23 +183,6 @@ public class UnionPlan implements QueryPlan {
         for (int i = 1 ; i < steps.size()-offset; i++) {
             steps.set(i, "    " + steps.get(i));
         }
-        for (QueryPlan plan : plans) {
-            if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null
-                    || plan.getEstimateInfoTimestamp() == null) {
-                /*
-                 * If any of the sub plans doesn't have the estimate info available, then we don't
-                 * provide estimate for the overall plan
-                 */
-                estimatedBytes = null;
-                estimatedRows = null;
-                estimateInfoTs = null;
-                break;
-            } else {
-                estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
-                estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
-                estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp());
-            }
-        }
         return new ExplainPlan(steps);
     }
 
@@ -265,25 +247,46 @@ public class UnionPlan implements QueryPlan {
 
     @Override
     public Long getEstimatedRowsToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedRows;
     }
 
     @Override
     public Long getEstimatedBytesToScan() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimatedBytes;
     }
 
     @Override
     public Long getEstimateInfoTimestamp() throws SQLException {
-        if (!explainPlanCalled) {
-            getExplainPlan();
+        if (!getEstimatesCalled) {
+            getEstimates();
         }
         return estimateInfoTs;
     }
+
+    private void getEstimates() throws SQLException {
+        getEstimatesCalled = true;
+        for (QueryPlan plan : plans) {
+            if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null
+                    || plan.getEstimateInfoTimestamp() == null) {
+                /*
+                 * If any of the sub plans doesn't have the estimate info available, then we don't
+                 * provide estimate for the overall plan
+                 */
+                estimatedBytes = null;
+                estimatedRows = null;
+                estimateInfoTs = null;
+                break;
+            } else {
+                estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan());
+                estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan());
+                estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp());
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7ef96fe1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 94aeb34..da8beae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -578,7 +578,14 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         @Override
         public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
             CompilableStatement compilableStmt = getStatement();
-            final StatementPlan plan = compilableStmt.compilePlan(stmt, Sequence.ValueOp.VALIDATE_SEQUENCE);
+            StatementPlan compilePlan = compilableStmt.compilePlan(stmt, Sequence.ValueOp.VALIDATE_SEQUENCE);
+            // For a QueryPlan, we need to get its optimized plan; for a MutationPlan, its enclosed QueryPlan
+            // has already been optimized during compilation.
+            if (compilePlan instanceof QueryPlan) {
+                QueryPlan dataPlan = (QueryPlan) compilePlan;
+                compilePlan = stmt.getConnection().getQueryServices().getOptimizer().optimize(stmt, dataPlan);
+            }
+            final StatementPlan plan = compilePlan;
             List<String> planSteps = plan.getExplainPlan().getPlanSteps();
             List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
             Long estimatedBytesToScan = plan.getEstimatedBytesToScan();


[3/9] phoenix git commit: PHOENIX-3050 Handle DESC columns in child/parent join optimization

Posted by ma...@apache.org.
PHOENIX-3050 Handle DESC columns in child/parent join optimization


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/977699af
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/977699af
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/977699af

Branch: refs/heads/4.x-HBase-0.98
Commit: 977699afe0d66f1434b8bc1c5a751767e563d6ce
Parents: 92b57c7
Author: maryannxue <ma...@gmail.com>
Authored: Wed Dec 6 12:07:16 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:17:16 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/join/HashJoinMoreIT.java     |  5 +++++
 .../org/apache/phoenix/compile/JoinCompiler.java | 19 +++++++++++++------
 .../apache/phoenix/compile/QueryCompiler.java    |  6 +++---
 .../apache/phoenix/compile/WhereOptimizer.java   |  5 -----
 4 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index 37ffd02..f09f1d3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -895,6 +895,11 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
                     + "FROM ( SELECT ACCOUNT_ID, BUCKET_ID, OBJECT_ID, MAX(OBJECT_VERSION) AS MAXVER "
                     + "       FROM test2961 GROUP BY ACCOUNT_ID, BUCKET_ID, OBJECT_ID) AS X "
                     + "       INNER JOIN test2961 AS OBJ ON X.ACCOUNT_ID = OBJ.ACCOUNT_ID AND X.BUCKET_ID = OBJ.BUCKET_ID AND X.OBJECT_ID = OBJ.OBJECT_ID AND  X.MAXVER = OBJ.OBJECT_VERSION";
+            rs = conn.createStatement().executeQuery("explain " + q);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String dynamicFilter = "DYNAMIC SERVER FILTER BY (OBJ.ACCOUNT_ID, OBJ.BUCKET_ID, OBJ.OBJECT_ID, OBJ.OBJECT_VERSION) IN ((X.ACCOUNT_ID, X.BUCKET_ID, X.OBJECT_ID, X.MAXVER))";
+            assertTrue("Expected '" + dynamicFilter + "' to be used for the query, but got:\n" + plan,
+                    plan.contains(dynamicFilter));
             rs = conn.createStatement().executeQuery(q);
             assertTrue(rs.next());
             assertEquals("2222", rs.getString(4));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f9d8711..f3c4c24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -108,6 +108,12 @@ import com.google.common.collect.Sets;
 
 public class JoinCompiler {
 
+    public enum Strategy {
+        HASH_BUILD_LEFT,
+        HASH_BUILD_RIGHT,
+        SORT_MERGE,
+    }
+
     public enum ColumnRefType {
         JOINLOCAL,
         GENERAL,
@@ -487,7 +493,7 @@ public class JoinCompiler {
             return dependencies;
         }
 
-        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException {
+        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, Strategy strategy) throws SQLException {
             if (onConditions.isEmpty()) {
                 return new Pair<List<Expression>, List<Expression>>(
                         Collections.<Expression> singletonList(LiteralExpression.newConstant(1)),
@@ -503,15 +509,16 @@ public class JoinCompiler {
                 rhsCompiler.reset();
                 Expression right = condition.getRHS().accept(rhsCompiler);
                 PDataType toType = getCommonType(left.getDataType(), right.getDataType());
-                if (left.getDataType() != toType || left.getSortOrder() == SortOrder.DESC) {
-                    left = CoerceExpression.create(left, toType, SortOrder.ASC, left.getMaxLength());
+                SortOrder toSortOrder = strategy == Strategy.SORT_MERGE ? SortOrder.ASC : (strategy == Strategy.HASH_BUILD_LEFT ? right.getSortOrder() : left.getSortOrder());
+                if (left.getDataType() != toType || left.getSortOrder() != toSortOrder) {
+                    left = CoerceExpression.create(left, toType, toSortOrder, left.getMaxLength());
                 }
-                if (right.getDataType() != toType || right.getSortOrder() == SortOrder.DESC) {
-                    right = CoerceExpression.create(right, toType, SortOrder.ASC, right.getMaxLength());
+                if (right.getDataType() != toType || right.getSortOrder() != toSortOrder) {
+                    right = CoerceExpression.create(right, toType, toSortOrder, right.getMaxLength());
                 }
                 compiled.add(new Pair<Expression, Expression>(left, right));
             }
-            if (sortExpressions) {
+            if (strategy != Strategy.SORT_MERGE) {
                 Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
                     @Override
                     public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9443110..c5bc44c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -305,7 +305,7 @@ public class QueryCompiler {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true);
+                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
                 joinExpressions[i] = joinConditions.getFirst();
                 List<Expression> hashExpressions = joinConditions.getSecond();
                 Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -367,7 +367,7 @@ public class QueryCompiler {
             context.setCurrentTable(rhsTableRef);
             context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
+            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
             List<Expression> joinExpressions = joinConditions.getSecond();
             List<Expression> hashExpressions = joinConditions.getFirst();
             boolean needsMerge = lhsJoin.hasPostReference();
@@ -420,7 +420,7 @@ public class QueryCompiler {
         QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
         PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
         
-        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false);
+        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
         List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
         List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/977699af/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index ccf073a..87f00e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -326,11 +326,6 @@ public class WhereOptimizer {
         PTable table = context.getCurrentTable().getTable();
         for (int i = 0; i < expressions.size(); i++) {
             Expression expression = expressions.get(i);
-            // TODO this is a temporary fix for PHOENIX-3029.
-            if (expression instanceof CoerceExpression
-                    && expression.getSortOrder() != expression.getChildren().get(0).getSortOrder()) {
-                continue;
-            }
             KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, table);
             KeyExpressionVisitor.KeySlots keySlots = expression.accept(visitor);
             int minPkPos = Integer.MAX_VALUE; 


[2/9] phoenix git commit: PHOENIX-4322 DESC primary key column with variable length does not work in SkipScanFilter

Posted by ma...@apache.org.
PHOENIX-4322 DESC primary key column with variable length does not work in SkipScanFilter


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92b57c78
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92b57c78
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92b57c78

Branch: refs/heads/4.x-HBase-0.98
Commit: 92b57c7893c91d90d78e30171e233043dbcb4583
Parents: 541d6ac
Author: maryannxue <ma...@gmail.com>
Authored: Tue Dec 5 10:59:41 2017 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 17:16:42 2018 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/phoenix/end2end/SortOrderIT.java  | 11 ++++++++++-
 .../expression/RowValueConstructorExpression.java        |  4 ++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b57c78/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
index 655dbb1..3f749c1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
@@ -167,7 +167,16 @@ public class SortOrderIT extends ParallelStatsDisabledIT {
         runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", 2}}, new WhereCondition("oid", "IN", "('o2')"),
             table);
     }
-    
+
+    @Test
+    public void inDescCompositePK3() throws Exception {
+        String table = generateUniqueName();
+        String ddl = "CREATE table " + table + " (oid VARCHAR NOT NULL, code VARCHAR NOT NULL constraint pk primary key (oid DESC, code DESC))";
+        Object[][] insertedRows = new Object[][]{{"o1", "1"}, {"o2", "2"}, {"o3", "3"}};
+        runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", "2"}, {"o1", "1"}}, new WhereCondition("(oid, code)", "IN", "(('o2', '2'), ('o1', '1'))"),
+                table);
+    }
+
     @Test
     public void likeDescCompositePK1() throws Exception {
         String table = generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b57c78/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
index 15f6e3e..9bb7234 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
@@ -199,8 +199,8 @@ public class RowValueConstructorExpression extends BaseCompoundExpression {
                     // as otherwise we need it to ensure sort order is correct
                     for (int k = expressionCount -1 ; 
                             k >=0 &&  getChildren().get(k).getDataType() != null 
-                                  && !getChildren().get(k).getDataType().isFixedWidth() 
-                                  && outputBytes[outputSize-1] == QueryConstants.SEPARATOR_BYTE ; k--) {
+                                  && !getChildren().get(k).getDataType().isFixedWidth()
+                                  && outputBytes[outputSize-1] == SchemaUtil.getSeparatorByte(true, false, getChildren().get(k)) ; k--) {
                         outputSize--;
                     }
                     ptr.set(outputBytes, 0, outputSize);


[7/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge join decision on cost

Posted by ma...@apache.org.
PHOENIX-1556 Base hash versus sort merge join decision on cost


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6914d54d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6914d54d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6914d54d

Branch: refs/heads/4.x-HBase-0.98
Commit: 6914d54d99b4fafae44d1a3397c44ba6e5d10368
Parents: 2c75823
Author: maryannxue <ma...@gmail.com>
Authored: Mon Feb 12 14:07:30 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 21:19:01 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CostBasedDecisionIT.java    | 420 ++++++++++++-----
 .../apache/phoenix/compile/JoinCompiler.java    |  43 ++
 .../phoenix/compile/ListJarsQueryPlan.java      |   8 +-
 .../apache/phoenix/compile/QueryCompiler.java   | 455 ++++++++++---------
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../apache/phoenix/compile/TraceQueryPlan.java  |   6 +
 .../apache/phoenix/execute/AggregatePlan.java   |  41 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  46 +-
 .../phoenix/execute/ClientProcessingPlan.java   |   4 +
 .../apache/phoenix/execute/ClientScanPlan.java  |  22 +-
 .../apache/phoenix/execute/CorrelatePlan.java   |  26 +-
 .../apache/phoenix/execute/CursorFetchPlan.java |   6 +
 .../apache/phoenix/execute/HashJoinPlan.java    | 128 ++++--
 .../execute/LiteralResultIterationPlan.java     |   6 +
 .../org/apache/phoenix/execute/ScanPlan.java    |  14 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |  20 +-
 .../phoenix/execute/TupleProjectionPlan.java    |   6 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  12 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   6 +
 .../execute/visitor/AvgRowWidthVisitor.java     | 205 +++++++++
 .../execute/visitor/ByteCountVisitor.java       | 125 +++++
 .../execute/visitor/QueryPlanVisitor.java       |  46 ++
 .../execute/visitor/RowCountVisitor.java        | 335 ++++++++++++++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   6 +
 .../java/org/apache/phoenix/util/CostUtil.java  |  61 +--
 .../query/ParallelIteratorsSplitTest.java       |   6 +
 26 files changed, 1615 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
index a3584ce..493855a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -32,12 +32,16 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
 
 public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+    private final String testTable500;
+    private final String testTable990;
+    private final String testTable1000;
 
     @BeforeClass
     public static void doSetup() throws Exception {
@@ -46,9 +50,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
         props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
         props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
         props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+        props.put(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, Long.toString(150000));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
+    public CostBasedDecisionIT() throws Exception {
+        testTable500 = initTestTableValues(500);
+        testTable990 = initTestTableValues(990);
+        testTable1000 = initTestTableValues(1000);
+    }
+
     @Test
     public void testCostOverridesStaticPlanOrdering1() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -64,10 +75,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
             // Use the data table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("FULL SCAN"));
+            verifyQueryPlan(query, "FULL SCAN");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -81,10 +89,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the index table plan that has a lower cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("RANGE SCAN"));
+            verifyQueryPlan(query, "RANGE SCAN");
         } finally {
             conn.close();
         }
@@ -103,12 +108,12 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1";
             // Use the index table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("RANGE SCAN"));
+            verifyQueryPlan(query,
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+                    "    SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -124,10 +129,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             // Given that the range on C1 is meaningless and group-by becomes
             // order-preserving if using the data table, the data table plan should
             // come out as the best plan based on the costs.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("FULL SCAN"));
+            verifyQueryPlan(query,
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "    SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+                    "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -150,14 +156,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                     "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                    "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                    "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -171,14 +173,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                     "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                    "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                    "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -201,15 +199,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "UPSERT SELECT\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                             "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                            "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -223,15 +217,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "UPSERT SELECT\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                             "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                            "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -254,15 +244,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "DELETE ROWS\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                             "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                            "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -276,15 +262,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "DELETE ROWS\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                             "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                            "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -303,22 +285,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
-                    + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1 "
+                    + "UNION ALL SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey >= 'a' GROUP BY c1";
             // Use the default plan when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String defaultPlan =
+            verifyQueryPlan(query,
                     "UNION ALL OVER 2 QUERIES\n" +
-                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
                     "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
                     "    CLIENT MERGE SORT\n" +
-                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
-                    "        SERVER FILTER BY FIRST KEY ONLY\n" +
-                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
-                    "    CLIENT MERGE SORT";
-            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(defaultPlan));
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['a'] - [*]\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "    CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -332,19 +309,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the optimal plan based on cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String optimizedPlan =
+            verifyQueryPlan(query,
                     "UNION ALL OVER 2 QUERIES\n" +
                     "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
-                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
                     "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
                     "    CLIENT MERGE SORT\n" +
-                    "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
-                    "        SERVER FILTER BY C1 LIKE 'X%'\n" +
-                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
-            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(optimizedPlan));
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" >= 'a'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "    CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -363,23 +337,18 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
-                    + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
-                    + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+            String query = "SELECT t1.rowkey, t1.c1, t1.c2, t2.c1, mc2 FROM " + tableName + " t1 "
+                    + "JOIN (SELECT c1, max(rowkey) mrk, max(c2) mc2 FROM " + tableName + " where rowkey <= 'z' GROUP BY c1) t2 "
+                    + "ON t1.rowkey = t2.mrk WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
             // Use the default plan when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String defaultPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
                     "    SERVER FILTER BY C1 LIKE 'X0%'\n" +
                     "    PARALLEL INNER-JOIN TABLE 0\n" +
-                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
-                    "            SERVER FILTER BY FIRST KEY ONLY\n" +
-                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
                     "        CLIENT MERGE SORT\n" +
-                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
-            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(defaultPlan));
+                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.MRK)");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -393,20 +362,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the optimal plan based on cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String optimizedPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
                     "    SERVER FILTER BY FIRST KEY ONLY\n" +
                     "    SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
                     "CLIENT MERGE SORT\n" +
                     "    PARALLEL INNER-JOIN TABLE 0\n" +
-                    "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
-                    "            SERVER FILTER BY C1 LIKE 'X%'\n" +
-                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
-                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
-            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(optimizedPlan));
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "            SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "        CLIENT MERGE SORT\n" +
+                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.MRK)");
         } finally {
             conn.close();
         }
@@ -432,10 +398,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
 
             // Use the index table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+            verifyQueryPlan(query, indexPlan);
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -449,18 +412,261 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the data table plan that has a lower cost when stats are available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+            verifyQueryPlan(query, dataPlan);
 
             // Use the index table plan as has been hinted.
-            rs = conn.createStatement().executeQuery("explain " + hintedQuery);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+            verifyQueryPlan(hintedQuery, indexPlan);
         } finally {
             conn.close();
         }
     }
+
+    /** Sort-merge-join w/ both children ordered wins over hash-join. */
+    @Test
+    public void testJoinStrategy() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000;
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Sort-merge-join w/ both children ordered wins over hash-join in an un-grouped aggregate query. */
+    @Test
+    public void testJoinStrategy2() throws Exception {
+        String q = "SELECT count(*)\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "WHERE t1.COL1 < 200";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "        SERVER FILTER BY COL1 < 200\n" +
+                "AND (SKIP MERGE)\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT AGGREGATE INTO SINGLE ROW";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join w/ PK/FK optimization wins over sort-merge-join w/ larger side ordered. */
+    @Test
+    public void testJoinStrategy3() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.COL1 = t2.ID\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "    DYNAMIC SERVER FILTER BY T2.ID IN (T1.COL1)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join w/ PK/FK optimization wins over hash-join w/o PK/FK optimization when two sides are close in size. */
+    @Test
+    public void testJoinStrategy4() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable990 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.COL1";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join wins over sort-merge-join w/ smaller side ordered. */
+    @Test
+    public void testJoinStrategy5() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.COL1\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join wins over sort-merge-join w/o any side ordered. */
+    @Test
+    public void testJoinStrategy6() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.COL1 = t2.COL1\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Hash-join wins over sort-merge-join w/ both sides ordered in an order-by query.
+     * This is because order-by can only be done on the client side after sort-merge-join
+     * and order-by w/o limit on the client side is very expensive.
+     */
+    @Test
+    public void testJoinStrategy7() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "ORDER BY t1.COL1";
+        String expected =
+                "CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    SERVER SORTED BY [T1.COL1]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "    DYNAMIC SERVER FILTER BY T2.ID IN (T1.ID)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Sort-merge-join w/ both sides ordered wins over hash-join in an order-by limit query.
+     * This is because order-by can only be done on the client side after sort-merge-join
+     * but order-by w/ limit on the client side is less expensive.
+     */
+    @Test
+    public void testJoinStrategy8() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "ORDER BY t1.COL1 LIMIT 5";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "CLIENT TOP 5 ROWS SORTED BY [T1.COL1]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: sort-merge-join chosen since all join keys are PK.
+     */
+    @Test
+    public void testJoinStrategy9() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 LEFT JOIN " + testTable500 + " t2\n" +
+                "ON t1.ID = t2.ID AND t2.ID > 200\n" +
+                "LEFT JOIN " + testTable990 + " t3\n" +
+                "ON t1.ID = t3.ID AND t3.ID < 100";
+        String expected =
+                "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "    SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    AND\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: a mix of join strategies chosen based on cost.
+     */
+    @Test
+    public void testJoinStrategy10() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+                "ON t1.ID = t2.COL1 AND t2.ID > 200\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.ID = t3.ID AND t3.ID < 100";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        PARALLEL INNER-JOIN TABLE 0\n" +
+                "            CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "        DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: hash-join two tables in parallel since two RHS tables are both small
+     * and can fit in memory at the same time.
+     */
+    @Test
+    public void testJoinStrategy11() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+                "ON t1.COL2 = t2.COL1 AND t2.ID > 200\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.COL1 = t3.COL2 AND t3.ID < 100";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "    PARALLEL INNER-JOIN TABLE 1\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: similar to {@link this#testJoinStrategy11()}, but the two RHS
+     * tables cannot fit in memory at the same time, and thus a mix of join strategies
+     * is chosen based on cost.
+     */
+    @Test
+    public void testJoinStrategy12() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable990 + " t2\n" +
+                "ON t1.COL2 = t2.COL1\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.COL1 = t3.COL2";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        SERVER SORTED BY [T1.COL1]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "        PARALLEL INNER-JOIN TABLE 0\n" +
+                "            CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 991-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "        SERVER SORTED BY [T3.COL2]\n" +
+                "    CLIENT MERGE SORT";
+        verifyQueryPlan(q, expected);
+    }
+
+    private static void verifyQueryPlan(String query, String expected) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+        String plan = QueryUtil.getExplainPlan(rs);
+        assertTrue("Expected '" + expected + "' in the plan:\n" + plan + ".",
+                plan.contains(expected));
+    }
+
+    private static String initTestTableValues(int rows) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "ID INTEGER NOT NULL PRIMARY KEY,\n" +
+                    "COL1 INTEGER," +
+                    "COL2 INTEGER)");
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + tableName + " VALUES(?, ?, ?)");
+            for (int i = 0; i < rows; i++) {
+                stmt.setInt(1, i + 1);
+                stmt.setInt(2, rows - i);
+                stmt.setInt(3, rows + i);
+                stmt.execute();
+            }
+            conn.commit();
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+            return tableName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f3c4c24..f5a7e39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -71,6 +71,8 @@ import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -124,6 +126,8 @@ public class JoinCompiler {
     private final ColumnResolver origResolver;
     private final boolean useStarJoin;
     private final Map<ColumnRef, ColumnRefType> columnRefs;
+    private final boolean useSortMergeJoin;
+    private final boolean costBased;
 
 
     private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
@@ -132,6 +136,9 @@ public class JoinCompiler {
         this.origResolver = resolver;
         this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
         this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+        this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
+        this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
+                QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
     }
 
     public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -365,6 +372,42 @@ public class JoinCompiler {
         }
 
         /**
+         * Return a list of all applicable join strategies. The order of the strategies in the
+         * returned list is based on the static rule below. However, the caller can decide on
+         * an optimal join strategy by evaluating and comparing the costs.
+         * 1. If hint USE_SORT_MERGE_JOIN is specified,
+         *    return a singleton list containing only SORT_MERGE.
+         * 2. If 1) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B"; or
+         *       2) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B (LEFT/INNER/SEMI/ANTI JOIN C)+"
+         *          and hint NO_STAR_JOIN is not specified,
+         *    add BUILD_RIGHT to the returned list.
+         * 3. If matches pattern "A RIGHT/INNER JOIN B", where B is either a named table reference
+         *    or a flat sub-query,
+         *    add BUILD_LEFT to the returned list.
+         * 4. add SORT_MERGE to the returned list.
+         */
+        public List<Strategy> getApplicableJoinStrategies() {
+            List<Strategy> strategies = Lists.newArrayList();
+            if (useSortMergeJoin) {
+                strategies.add(Strategy.SORT_MERGE);
+            } else {
+                if (getStarJoinVector() != null) {
+                    strategies.add(Strategy.HASH_BUILD_RIGHT);
+                }
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                if ((type == JoinType.Right || type == JoinType.Inner)
+                        && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+                        && lastJoinSpec.getJoinTable().getTable().isFlat()) {
+                    strategies.add(Strategy.HASH_BUILD_LEFT);
+                }
+                strategies.add(Strategy.SORT_MERGE);
+            }
+
+            return strategies;
+        }
+
+        /**
          * Returns a boolean vector indicating whether the evaluation of join expressions
          * can be evaluated at an early stage if the input JoinSpec can be taken as a
          * star join. Otherwise returns null.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index fa48e52..e536ae9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -251,7 +252,12 @@ public class ListJarsQueryPlan implements QueryPlan {
         return false;
     }
 
-	@Override
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
 	public Set<TableRef> getSourceRefs() {
 		return Collections.<TableRef>emptySet();
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index c5bc44c..c8650b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -52,6 +53,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.EqualParseNode;
 import org.apache.phoenix.parse.HintNode.Hint;
@@ -63,7 +65,10 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PDatum;
@@ -102,10 +107,10 @@ public class QueryCompiler {
     private final ParallelIteratorFactory parallelIteratorFactory;
     private final SequenceManager sequenceManager;
     private final boolean projectTuples;
-    private final boolean useSortMergeJoin;
     private final boolean noChildParentJoinOptimization;
     private final QueryPlan dataPlan;
-    
+    private final boolean costBased;
+
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
         this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
     }
@@ -119,9 +124,10 @@ public class QueryCompiler {
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.sequenceManager = sequenceManager;
         this.projectTuples = projectTuples;
-        this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
         this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
-        if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+        ConnectionQueryServices services = statement.getConnection().getQueryServices();
+        this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+        if (services.getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
             this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
         }
         if (select.getHint().hasHint(Hint.NO_CACHE)) {
@@ -201,41 +207,17 @@ public class QueryCompiler {
         }
     }
 
-    /*
+    /**
      * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
-     * This matches the input JoinTable node against patterns in the following order:
-     * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
-     *    Returns the compilation result of a single table scan or of an independent subquery.
-     * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
-     *        1) A LEFT/INNER JOIN B
-     *        2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
-     *        where A can be a named table reference or a flat subquery, and B, C, ... can be a named
-     *        table reference, a sub-join or a subquery of any kind.
-     *    Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
-     * 3. Matching pattern:
-     *        A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
-     *        where B can be a named table reference or a flat subquery, and A can be a named table
-     *        reference, a sub-join or a subquery of any kind.
-     *    Returns a HashJoinPlan{scan: B, hash: A}.
-     *    NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
-     *    "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
-     *    parenthesis is considered a sub-join.
-     *    viewed as a sub-join.
-     * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
-     *    Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
-     *    of both sides as order-by clauses.
-     * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
-     *    
-     * If no join algorithm hint is provided, according to the above compilation process, a join query 
-     * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
-     * TODO 1) Use table statistics to guide the choice of join plans.
-     *      2) Make it possible to hint a certain join algorithm for a specific join step.
+     * If it is a leaf node, call compileSingleFlatQuery() or compileSubquery(), otherwise:
+     *      1) If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, return the
+     *         join plan with the best cost. Note that the "best" plan is only locally optimal,
+     *         and might or might not be globally optimal.
+     *      2) Otherwise, return the join plan compiled with the default strategy.
+     * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
      */
-    @SuppressWarnings("unchecked")
     protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
-        byte[] emptyByteArray = new byte[0];
-        List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
-        if (joinSpecs.isEmpty()) {
+        if (joinTable.getJoinSpecs().isEmpty()) {
             Table table = joinTable.getTable();
             SelectStatement subquery = table.getAsSubquery(orderBy);
             if (!table.isSubselect()) {
@@ -252,199 +234,230 @@ public class QueryCompiler {
             context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
             return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
         }
-        
-        boolean[] starJoinVector;
-        if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
-            Table table = joinTable.getTable();
-            PTable initialProjectedTable;
-            TableRef tableRef;
-            SelectStatement query;
-            TupleProjector tupleProjector;
-            if (!table.isSubselect()) {
-                context.setCurrentTable(table.getTableRef());
-                initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
-                tableRef = table.getTableRef();
-                table.projectColumns(context.getScan());
-                query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
-                tupleProjector = new TupleProjector(initialProjectedTable);
-            } else {
-                SelectStatement subquery = table.getAsSubquery(orderBy);
-                QueryPlan plan = compileSubquery(subquery, false);
-                initialProjectedTable = table.createProjectedTable(plan.getProjector());
-                tableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-                tupleProjector = new TupleProjector(plan.getProjector());
+
+        List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies();
+        assert strategies.size() > 0;
+        if (!costBased || strategies.size() == 1) {
+            return compileJoinQuery(
+                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+        }
+
+        QueryPlan bestPlan = null;
+        Cost bestCost = null;
+        for (JoinCompiler.Strategy strategy : strategies) {
+            StatementContext newContext = new StatementContext(
+                    context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
+            QueryPlan plan = compileJoinQuery(
+                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+            Cost cost = plan.getCost();
+            if (bestPlan == null || cost.compareTo(bestCost) < 0) {
+                bestPlan = plan;
+                bestCost = cost;
             }
-            context.setCurrentTable(tableRef);
-            PTable projectedTable = initialProjectedTable;
-            int count = joinSpecs.size();
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
-            List<Expression>[] joinExpressions = new List[count];
-            JoinType[] joinTypes = new JoinType[count];
-            PTable[] tables = new PTable[count];
-            int[] fieldPositions = new int[count];
-            StatementContext[] subContexts = new StatementContext[count];
-            QueryPlan[] subPlans = new QueryPlan[count];
-            HashSubPlan[] hashPlans = new HashSubPlan[count];
-            fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
-            for (int i = 0; i < count; i++) {
-                JoinSpec joinSpec = joinSpecs.get(i);
-                Scan subScan = ScanUtil.newScan(originalScan);
-                subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
-                boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
-                if (hasPostReference) {
-                    tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
-                    projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+        }
+        context.setResolver(bestPlan.getContext().getResolver());
+        context.setCurrentTable(bestPlan.getContext().getCurrentTable());
+        return bestPlan;
+    }
+
+    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+        byte[] emptyByteArray = new byte[0];
+        List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
+        switch (strategy) {
+            case HASH_BUILD_RIGHT: {
+                boolean[] starJoinVector = joinTable.getStarJoinVector();
+                Table table = joinTable.getTable();
+                PTable initialProjectedTable;
+                TableRef tableRef;
+                SelectStatement query;
+                TupleProjector tupleProjector;
+                if (!table.isSubselect()) {
+                    context.setCurrentTable(table.getTableRef());
+                    initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
+                    tableRef = table.getTableRef();
+                    table.projectColumns(context.getScan());
+                    query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
+                    tupleProjector = new TupleProjector(initialProjectedTable);
                 } else {
-                    tables[i] = null;
+                    SelectStatement subquery = table.getAsSubquery(orderBy);
+                    QueryPlan plan = compileSubquery(subquery, false);
+                    initialProjectedTable = table.createProjectedTable(plan.getProjector());
+                    tableRef = plan.getTableRef();
+                    context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                    query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+                    tupleProjector = new TupleProjector(plan.getProjector());
                 }
-            }
-            for (int i = 0; i < count; i++) {
-                JoinSpec joinSpec = joinSpecs.get(i);
-                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
-                joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
-                joinExpressions[i] = joinConditions.getFirst();
-                List<Expression> hashExpressions = joinConditions.getSecond();
-                Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-                boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
-                Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
-                Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
-                joinTypes[i] = joinSpec.getType();
-                if (i < count - 1) {
-                    fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+                context.setCurrentTable(tableRef);
+                PTable projectedTable = initialProjectedTable;
+                int count = joinSpecs.size();
+                ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+                List<Expression>[] joinExpressions = new List[count];
+                JoinType[] joinTypes = new JoinType[count];
+                PTable[] tables = new PTable[count];
+                int[] fieldPositions = new int[count];
+                StatementContext[] subContexts = new StatementContext[count];
+                QueryPlan[] subPlans = new QueryPlan[count];
+                HashSubPlan[] hashPlans = new HashSubPlan[count];
+                fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
+                for (int i = 0; i < count; i++) {
+                    JoinSpec joinSpec = joinSpecs.get(i);
+                    Scan subScan = ScanUtil.newScan(originalScan);
+                    subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+                    boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
+                    if (hasPostReference) {
+                        tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
+                        projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+                    } else {
+                        tables[i] = null;
+                    }
                 }
-                hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
-            }
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-            QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
-            Integer limit = null;
-            Integer offset = null;
-            if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
-                limit = plan.getLimit();
-                offset = plan.getOffset();
-            }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
-                    starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
-            return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
-        }
-        
-        JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
-        JoinType type = lastJoinSpec.getType();
-        if (!this.useSortMergeJoin 
-                && (type == JoinType.Right || type == JoinType.Inner) 
-                && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
-                && lastJoinSpec.getJoinTable().getTable().isFlat()) {            
-            JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
-            Table rhsTable = rhsJoinTable.getTable();
-            JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
-            Scan subScan = ScanUtil.newScan(originalScan);
-            StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-            QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
-            PTable rhsProjTable;
-            TableRef rhsTableRef;
-            SelectStatement rhs;
-            TupleProjector tupleProjector;
-            if (!rhsTable.isSubselect()) {
-                context.setCurrentTable(rhsTable.getTableRef());
-                rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
-                rhsTableRef = rhsTable.getTableRef();
-                rhsTable.projectColumns(context.getScan());
-                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
-                tupleProjector = new TupleProjector(rhsProjTable);
-            } else {
-                SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
-                QueryPlan plan = compileSubquery(subquery, false);
-                rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
-                rhsTableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-                tupleProjector = new TupleProjector(plan.getProjector());
+                for (int i = 0; i < count; i++) {
+                    JoinSpec joinSpec = joinSpecs.get(i);
+                    context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
+                    joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
+                    Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], strategy);
+                    joinExpressions[i] = joinConditions.getFirst();
+                    List<Expression> hashExpressions = joinConditions.getSecond();
+                    Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+                    boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+                    Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
+                    Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
+                    joinTypes[i] = joinSpec.getType();
+                    if (i < count - 1) {
+                        fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+                    }
+                    hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+                }
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
+                Integer limit = null;
+                Integer offset = null;
+                if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+                    limit = plan.getLimit();
+                    offset = plan.getOffset();
+                }
+                HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
+                        starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+                return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
             }
-            context.setCurrentTable(rhsTableRef);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
-            List<Expression> joinExpressions = joinConditions.getSecond();
-            List<Expression> hashExpressions = joinConditions.getFirst();
-            boolean needsMerge = lhsJoin.hasPostReference();
-            PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
-            int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
-            PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
-            QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
-            Integer limit = null;
-            Integer offset = null;
-            if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
-                limit = rhsPlan.getLimit();
-                offset = rhsPlan.getOffset();
+            case HASH_BUILD_LEFT: {
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
+                Table rhsTable = rhsJoinTable.getTable();
+                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                Scan subScan = ScanUtil.newScan(originalScan);
+                StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+                PTable rhsProjTable;
+                TableRef rhsTableRef;
+                SelectStatement rhs;
+                TupleProjector tupleProjector;
+                if (!rhsTable.isSubselect()) {
+                    context.setCurrentTable(rhsTable.getTableRef());
+                    rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
+                    rhsTableRef = rhsTable.getTableRef();
+                    rhsTable.projectColumns(context.getScan());
+                    rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
+                    tupleProjector = new TupleProjector(rhsProjTable);
+                } else {
+                    SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
+                    QueryPlan plan = compileSubquery(subquery, false);
+                    rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
+                    rhsTableRef = plan.getTableRef();
+                    context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                    rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+                    tupleProjector = new TupleProjector(plan.getProjector());
+                }
+                context.setCurrentTable(rhsTableRef);
+                context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
+                ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[]{new ImmutableBytesPtr(emptyByteArray)};
+                Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, strategy);
+                List<Expression> joinExpressions = joinConditions.getSecond();
+                List<Expression> hashExpressions = joinConditions.getFirst();
+                boolean needsMerge = lhsJoin.hasPostReference();
+                PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
+                int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
+                PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
+                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
+                Integer limit = null;
+                Integer offset = null;
+                if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+                    limit = rhsPlan.getLimit();
+                    offset = rhsPlan.getOffset();
+                }
+                HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
+                        new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
+                        new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+                Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+                getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
+                return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions },
-                    new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true },
-                    new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression,  QueryUtil.getOffsetLimit(limit, offset));
-            Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-            getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
-            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
-        }
-        
-        JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
-        JoinTable rhsJoin = lastJoinSpec.getJoinTable();        
-        if (type == JoinType.Right) {
-            JoinTable temp = lhsJoin;
-            lhsJoin = rhsJoin;
-            rhsJoin = temp;
-        }
-        
-        List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
-        List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
-        List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
-        for (EqualParseNode condition : joinConditionNodes) {
-            lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
-            rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
-        }
-        
-        Scan lhsScan = ScanUtil.newScan(originalScan);
-        StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
-        boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
-        QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
-        PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
-        boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
-        
-        Scan rhsScan = ScanUtil.newScan(originalScan);
-        StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
-        QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
-        PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
-        
-        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
-        List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
-        List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
-        
-        boolean needsMerge = rhsJoin.hasPostReference();
-        int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
-        PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+            case SORT_MERGE: {
+                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+                if (type == JoinType.Right) {
+                    JoinTable temp = lhsJoin;
+                    lhsJoin = rhsJoin;
+                    rhsJoin = temp;
+                }
 
-        ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
-        TableRef tableRef = resolver.getTables().get(0);
-        StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
-        subCtx.setCurrentTable(tableRef);
-        QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
-        context.setCurrentTable(tableRef);
-        context.setResolver(resolver);
-        TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
-        ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery
-                ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
-                        Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false,
-                        joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(),
+                List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+                List<OrderByNode> lhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+                List<OrderByNode> rhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+                for (EqualParseNode condition : joinConditionNodes) {
+                    lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+                    rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+                }
+
+                Scan lhsScan = ScanUtil.newScan(originalScan);
+                StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+                boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+                PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
+                boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+
+                Scan rhsScan = ScanUtil.newScan(originalScan);
+                StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+                PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
+
+                Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
+                List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+                List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+
+                boolean needsMerge = rhsJoin.hasPostReference();
+                int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
+                PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+                ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
+                TableRef tableRef = resolver.getTables().get(0);
+                StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+                subCtx.setCurrentTable(tableRef);
+                QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
+                context.setCurrentTable(tableRef);
+                context.setResolver(resolver);
+                TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+                ParseNode where = joinTable.getPostFiltersCombined();
+                SelectStatement select = asSubquery
+                        ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
+                        Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false,
+                        joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(),
                         joinTable.getStatement().getUdfParseNodes())
-                : NODE_FACTORY.select(joinTable.getStatement(), from, where);
-        
-        return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+                        : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+
+                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+            }
+            default:
+                throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
+        }
     }
     
     private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index ca88984..c2edaf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.optimize.Cost;
@@ -90,4 +91,5 @@ public interface QueryPlan extends StatementPlan {
      */
     public boolean useRoundRobinIterator() throws SQLException;
 
+    <T> T accept(QueryPlanVisitor<T> visitor);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 5cd1d08..18060f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -276,6 +277,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Long getEstimatedRowsToScan() {
         return 0l;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6914d54d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2e042e7..0c8e8dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -33,6 +33,10 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyExpression;
@@ -117,25 +121,39 @@ public class AggregatePlan extends BaseQueryPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
+        Double outputBytes = this.accept(new ByteCountVisitor());
+        Double rowWidth = this.accept(new AvgRowWidthVisitor());
+        Long inputRows = null;
         try {
-            byteCount = getEstimatedBytesToScan();
+            inputRows = getEstimatedRowsToScan();
         } catch (SQLException e) {
             // ignored.
         }
-
-        if (byteCount == null) {
+        if (inputRows == null || outputBytes == null || rowWidth == null) {
             return Cost.UNKNOWN;
         }
+        double inputBytes = inputRows * rowWidth;
+        double rowsBeforeHaving = RowCountVisitor.aggregate(
+                                    RowCountVisitor.filter(
+                                            inputRows.doubleValue(),
+                                            RowCountVisitor.stripSkipScanFilter(
+                                                    context.getScan().getFilter())),
+                                    groupBy);
+        double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+        double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+        double bytesAfterHaving = rowWidth * rowsAfterHaving;
 
         int parallelLevel = CostUtil.estimateParallelLevel(
                 true, context.getConnection().getQueryServices());
-        Cost cost = CostUtil.estimateAggregateCost(byteCount,
-                groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+        Cost cost = new Cost(0, 0, inputBytes);
+        Cost aggCost = CostUtil.estimateAggregateCost(
+                inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
+        cost = cost.plus(aggCost);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            double outputBytes = CostUtil.estimateAggregateOutputBytes(
-                    byteCount, groupBy, aggregators.getEstimatedByteSize());
-            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            parallelLevel = CostUtil.estimateParallelLevel(
+                    false, context.getConnection().getQueryServices());
+            Cost orderByCost = CostUtil.estimateOrderByCost(
+                    bytesAfterHaving, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return cost;
@@ -304,4 +322,9 @@ public class AggregatePlan extends BaseQueryPlan {
         return false;
     }
 
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
 }


[5/9] phoenix git commit: PHOENIX-3505 Avoid NPE on close() in OrderedResultIterator

Posted by ma...@apache.org.
PHOENIX-3505 Avoid NPE on close() in OrderedResultIterator


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c758234
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c758234
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c758234

Branch: refs/heads/4.x-HBase-0.98
Commit: 2c758234186e5a4d70cdc6501df19f9b0d9ec601
Parents: 7ef96fe
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 23 11:16:35 2016 -0500
Committer: maryannxue <ma...@gmail.com>
Committed: Tue Mar 13 20:08:46 2018 -0700

----------------------------------------------------------------------
 .../phoenix/iterate/OrderedResultIterator.java  |  5 ++-
 .../iterate/OrderedResultIteratorTest.java      | 41 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c758234/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 36ca00b..36b274a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -280,7 +280,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
 
     @Override
     public void close() throws SQLException {
-        resultIterator.close();
+        // Guard against resultIterator being null
+        if (null != resultIterator) {
+            resultIterator.close();
+        }
         resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c758234/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
new file mode 100644
index 0000000..50ed8e9
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.expression.OrderByExpression;
+import org.junit.Test;
+
+/**
+ * Test class for {@link OrderedResultIterator}.
+ */
+public class OrderedResultIteratorTest {
+
+  @Test
+  public void testNullIteratorOnClose() throws SQLException {
+      ResultIterator delegate =  ResultIterator.EMPTY_ITERATOR;
+      List<OrderByExpression> orderByExpressions = Collections.singletonList(null);
+      int thresholdBytes = Integer.MAX_VALUE;
+      OrderedResultIterator iterator = new OrderedResultIterator(delegate, orderByExpressions, thresholdBytes);
+      // Should not throw an exception
+      iterator.close();
+  }
+
+}