You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2018/02/12 22:07:45 UTC

[2/2] phoenix git commit: PHOENIX-1556 Base hash versus sort merge join decision on cost

PHOENIX-1556 Base hash versus sort merge join decision on cost


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9461d0d6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9461d0d6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9461d0d6

Branch: refs/heads/master
Commit: 9461d0d6a299bb3cbcf53905d7e9b73895a99299
Parents: a6bf735
Author: maryannxue <ma...@gmail.com>
Authored: Mon Feb 12 14:07:30 2018 -0800
Committer: maryannxue <ma...@gmail.com>
Committed: Mon Feb 12 14:07:30 2018 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/CostBasedDecisionIT.java    | 420 ++++++++++++-----
 .../apache/phoenix/compile/JoinCompiler.java    |  43 ++
 .../phoenix/compile/ListJarsQueryPlan.java      |   8 +-
 .../apache/phoenix/compile/QueryCompiler.java   | 449 ++++++++++---------
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../apache/phoenix/compile/TraceQueryPlan.java  |   6 +
 .../apache/phoenix/execute/AggregatePlan.java   |  41 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  46 +-
 .../phoenix/execute/ClientProcessingPlan.java   |   4 +
 .../apache/phoenix/execute/ClientScanPlan.java  |  22 +-
 .../apache/phoenix/execute/CorrelatePlan.java   |  26 +-
 .../apache/phoenix/execute/CursorFetchPlan.java |   6 +
 .../apache/phoenix/execute/HashJoinPlan.java    | 128 ++++--
 .../execute/LiteralResultIterationPlan.java     |   6 +
 .../org/apache/phoenix/execute/ScanPlan.java    |  14 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |  20 +-
 .../phoenix/execute/TupleProjectionPlan.java    |   6 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  12 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   6 +
 .../execute/visitor/AvgRowWidthVisitor.java     | 205 +++++++++
 .../execute/visitor/ByteCountVisitor.java       | 125 ++++++
 .../execute/visitor/QueryPlanVisitor.java       |  46 ++
 .../execute/visitor/RowCountVisitor.java        | 335 ++++++++++++++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   6 +
 .../java/org/apache/phoenix/util/CostUtil.java  |  61 +--
 .../query/ParallelIteratorsSplitTest.java       |   6 +
 26 files changed, 1612 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
index a3584ce..493855a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -32,12 +32,16 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
 
 public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+    private final String testTable500;
+    private final String testTable990;
+    private final String testTable1000;
 
     @BeforeClass
     public static void doSetup() throws Exception {
@@ -46,9 +50,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
         props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
         props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
         props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+        props.put(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, Long.toString(150000));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
+    public CostBasedDecisionIT() throws Exception {
+        testTable500 = initTestTableValues(500);
+        testTable990 = initTestTableValues(990);
+        testTable1000 = initTestTableValues(1000);
+    }
+
     @Test
     public void testCostOverridesStaticPlanOrdering1() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -64,10 +75,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
             // Use the data table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("FULL SCAN"));
+            verifyQueryPlan(query, "FULL SCAN");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -81,10 +89,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the index table plan that has a lower cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("RANGE SCAN"));
+            verifyQueryPlan(query, "RANGE SCAN");
         } finally {
             conn.close();
         }
@@ -103,12 +108,12 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1";
             // Use the index table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("RANGE SCAN"));
+            verifyQueryPlan(query,
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+                    "    SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -124,10 +129,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             // Given that the range on C1 is meaningless and group-by becomes
             // order-preserving if using the data table, the data table plan should
             // come out as the best plan based on the costs.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
-                    plan.contains("FULL SCAN"));
+            verifyQueryPlan(query,
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "    SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+                    "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -150,14 +156,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                     "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                    "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                    "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -171,14 +173,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                     "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                    "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                    "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -201,15 +199,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "UPSERT SELECT\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                             "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                            "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -223,15 +217,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "UPSERT SELECT\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                             "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                            "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -254,15 +244,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
 
             String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
             // Use the idx2 plan with a wider PK slot span when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String indexPlan =
+            verifyQueryPlan(query,
                     "DELETE ROWS\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
                             "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+                            "CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -276,15 +262,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the idx2 plan that scans less data when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String dataPlan =
+            verifyQueryPlan(query,
                     "DELETE ROWS\n" +
                     "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
                             "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
-                            "CLIENT MERGE SORT";
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+                            "CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -303,22 +285,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
-                    + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1 "
+                    + "UNION ALL SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey >= 'a' GROUP BY c1";
             // Use the default plan when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String defaultPlan =
+            verifyQueryPlan(query,
                     "UNION ALL OVER 2 QUERIES\n" +
-                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
                     "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
                     "    CLIENT MERGE SORT\n" +
-                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
-                    "        SERVER FILTER BY FIRST KEY ONLY\n" +
-                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
-                    "    CLIENT MERGE SORT";
-            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(defaultPlan));
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['a'] - [*]\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "    CLIENT MERGE SORT");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -332,19 +309,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the optimal plan based on cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String optimizedPlan =
+            verifyQueryPlan(query,
                     "UNION ALL OVER 2 QUERIES\n" +
                     "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
-                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
                     "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
                     "    CLIENT MERGE SORT\n" +
-                    "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
-                    "        SERVER FILTER BY C1 LIKE 'X%'\n" +
-                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
-            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(optimizedPlan));
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" >= 'a'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "    CLIENT MERGE SORT");
         } finally {
             conn.close();
         }
@@ -363,23 +337,18 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
                     "c2 VARCHAR)");
             conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
 
-            String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
-                    + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
-                    + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+            String query = "SELECT t1.rowkey, t1.c1, t1.c2, t2.c1, mc2 FROM " + tableName + " t1 "
+                    + "JOIN (SELECT c1, max(rowkey) mrk, max(c2) mc2 FROM " + tableName + " where rowkey <= 'z' GROUP BY c1) t2 "
+                    + "ON t1.rowkey = t2.mrk WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
             // Use the default plan when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            String defaultPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
                     "    SERVER FILTER BY C1 LIKE 'X0%'\n" +
                     "    PARALLEL INNER-JOIN TABLE 0\n" +
-                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
-                    "            SERVER FILTER BY FIRST KEY ONLY\n" +
-                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" +
+                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
                     "        CLIENT MERGE SORT\n" +
-                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
-            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(defaultPlan));
+                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.MRK)");
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -393,20 +362,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the optimal plan based on cost when stats become available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            String optimizedPlan =
+            verifyQueryPlan(query,
                     "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
                     "    SERVER FILTER BY FIRST KEY ONLY\n" +
                     "    SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
                     "CLIENT MERGE SORT\n" +
                     "    PARALLEL INNER-JOIN TABLE 0\n" +
-                    "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
-                    "            SERVER FILTER BY C1 LIKE 'X%'\n" +
-                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
-                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
-            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(optimizedPlan));
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "            SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" +
+                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "        CLIENT MERGE SORT\n" +
+                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.MRK)");
         } finally {
             conn.close();
         }
@@ -432,10 +398,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
 
             // Use the index table plan that opts out order-by when stats are not available.
-            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
-            String plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+            verifyQueryPlan(query, indexPlan);
 
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
             for (int i = 0; i < 10000; i++) {
@@ -449,18 +412,261 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPDATE STATISTICS " + tableName);
 
             // Use the data table plan that has a lower cost when stats are available.
-            rs = conn.createStatement().executeQuery("explain " + query);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(dataPlan));
+            verifyQueryPlan(query, dataPlan);
 
             // Use the index table plan as has been hinted.
-            rs = conn.createStatement().executeQuery("explain " + hintedQuery);
-            plan = QueryUtil.getExplainPlan(rs);
-            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
-                    plan.contains(indexPlan));
+            verifyQueryPlan(hintedQuery, indexPlan);
         } finally {
             conn.close();
         }
     }
+
+    /** Sort-merge-join w/ both children ordered wins over hash-join. */
+    @Test
+    public void testJoinStrategy() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000;
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Sort-merge-join w/ both children ordered wins over hash-join in an un-grouped aggregate query. */
+    @Test
+    public void testJoinStrategy2() throws Exception {
+        String q = "SELECT count(*)\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "WHERE t1.COL1 < 200";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "        SERVER FILTER BY COL1 < 200\n" +
+                "AND (SKIP MERGE)\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT AGGREGATE INTO SINGLE ROW";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join w/ PK/FK optimization wins over sort-merge-join w/ larger side ordered. */
+    @Test
+    public void testJoinStrategy3() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.COL1 = t2.ID\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "    DYNAMIC SERVER FILTER BY T2.ID IN (T1.COL1)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join w/ PK/FK optimization wins over hash-join w/o PK/FK optimization when two sides are close in size. */
+    @Test
+    public void testJoinStrategy4() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable990 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.COL1";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join wins over sort-merge-join w/ smaller side ordered. */
+    @Test
+    public void testJoinStrategy5() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.COL1\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /** Hash-join wins over sort-merge-join w/o any side ordered. */
+    @Test
+    public void testJoinStrategy6() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.COL1 = t2.COL1\n" +
+                "WHERE t1.ID > 200";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Hash-join wins over sort-merge-join w/ both sides ordered in an order-by query.
+     * This is because order-by can only be done on the client side after sort-merge-join
+     * and order-by w/o limit on the client side is very expensive.
+     */
+    @Test
+    public void testJoinStrategy7() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "ORDER BY t1.COL1";
+        String expected =
+                "CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    SERVER SORTED BY [T1.COL1]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "    DYNAMIC SERVER FILTER BY T2.ID IN (T1.ID)";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Sort-merge-join w/ both sides ordered wins over hash-join in an order-by limit query.
+     * This is because order-by can only be done on the client side after sort-merge-join
+     * but order-by w/ limit on the client side is less expensive.
+     */
+    @Test
+    public void testJoinStrategy8() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" +
+                "ON t1.ID = t2.ID\n" +
+                "ORDER BY t1.COL1 LIMIT 5";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "CLIENT TOP 5 ROWS SORTED BY [T1.COL1]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: sort-merge-join chosen since all join keys are PK.
+     */
+    @Test
+    public void testJoinStrategy9() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 LEFT JOIN " + testTable500 + " t2\n" +
+                "ON t1.ID = t2.ID AND t2.ID > 200\n" +
+                "LEFT JOIN " + testTable990 + " t3\n" +
+                "ON t1.ID = t3.ID AND t3.ID < 100";
+        String expected =
+                "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "    SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    AND\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: a mix of join strategies chosen based on cost.
+     */
+    @Test
+    public void testJoinStrategy10() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+                "ON t1.ID = t2.COL1 AND t2.ID > 200\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.ID = t3.ID AND t3.ID < 100";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        PARALLEL INNER-JOIN TABLE 0\n" +
+                "            CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "        DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: hash-join two tables in parallel since two RHS tables are both small
+     * and can fit in memory at the same time.
+     */
+    @Test
+    public void testJoinStrategy11() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" +
+                "ON t1.COL2 = t2.COL1 AND t2.ID > 200\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.COL1 = t3.COL2 AND t3.ID < 100";
+        String expected =
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" +
+                "    PARALLEL INNER-JOIN TABLE 1\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]";
+        verifyQueryPlan(q, expected);
+    }
+
+    /**
+     * Multi-table join: similar to {@link this#testJoinStrategy11()}, but the two RHS
+     * tables cannot fit in memory at the same time, and thus a mix of join strategies
+     * is chosen based on cost.
+     */
+    @Test
+    public void testJoinStrategy12() throws Exception {
+        String q = "SELECT *\n" +
+                "FROM " + testTable1000 + " t1 JOIN " + testTable990 + " t2\n" +
+                "ON t1.COL2 = t2.COL1\n" +
+                "JOIN " + testTable990 + " t3\n" +
+                "ON t1.COL1 = t3.COL2";
+        String expected =
+                "SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "    CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" +
+                "        SERVER SORTED BY [T1.COL1]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "        PARALLEL INNER-JOIN TABLE 0\n" +
+                "            CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "AND\n" +
+                "    CLIENT PARALLEL 991-WAY FULL SCAN OVER " + testTable990 + "\n" +
+                "        SERVER SORTED BY [T3.COL2]\n" +
+                "    CLIENT MERGE SORT";
+        verifyQueryPlan(q, expected);
+    }
+
+    private static void verifyQueryPlan(String query, String expected) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+        String plan = QueryUtil.getExplainPlan(rs);
+        assertTrue("Expected '" + expected + "' in the plan:\n" + plan + ".",
+                plan.contains(expected));
+    }
+
+    private static String initTestTableValues(int rows) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "ID INTEGER NOT NULL PRIMARY KEY,\n" +
+                    "COL1 INTEGER," +
+                    "COL2 INTEGER)");
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + tableName + " VALUES(?, ?, ?)");
+            for (int i = 0; i < rows; i++) {
+                stmt.setInt(1, i + 1);
+                stmt.setInt(2, rows - i);
+                stmt.setInt(3, rows + i);
+                stmt.execute();
+            }
+            conn.commit();
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+            return tableName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f3c4c24..f5a7e39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -71,6 +71,8 @@ import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -124,6 +126,8 @@ public class JoinCompiler {
     private final ColumnResolver origResolver;
     private final boolean useStarJoin;
     private final Map<ColumnRef, ColumnRefType> columnRefs;
+    private final boolean useSortMergeJoin;
+    private final boolean costBased;
 
 
     private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
@@ -132,6 +136,9 @@ public class JoinCompiler {
         this.origResolver = resolver;
         this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
         this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+        this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
+        this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
+                QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
     }
 
     public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -365,6 +372,42 @@ public class JoinCompiler {
         }
 
         /**
+         * Return a list of all applicable join strategies. The order of the strategies in the
+         * returned list is based on the static rule below. However, the caller can decide on
+         * an optimal join strategy by evaluating and comparing the costs.
+         * 1. If hint USE_SORT_MERGE_JOIN is specified,
+         *    return a singleton list containing only SORT_MERGE.
+         * 2. If 1) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B"; or
+         *       2) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B (LEFT/INNER/SEMI/ANTI JOIN C)+"
+         *          and hint NO_STAR_JOIN is not specified,
+         *    add BUILD_RIGHT to the returned list.
+         * 3. If matches pattern "A RIGHT/INNER JOIN B", where B is either a named table reference
+         *    or a flat sub-query,
+         *    add BUILD_LEFT to the returned list.
+         * 4. add SORT_MERGE to the returned list.
+         */
+        public List<Strategy> getApplicableJoinStrategies() {
+            List<Strategy> strategies = Lists.newArrayList();
+            if (useSortMergeJoin) {
+                strategies.add(Strategy.SORT_MERGE);
+            } else {
+                if (getStarJoinVector() != null) {
+                    strategies.add(Strategy.HASH_BUILD_RIGHT);
+                }
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                if ((type == JoinType.Right || type == JoinType.Inner)
+                        && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+                        && lastJoinSpec.getJoinTable().getTable().isFlat()) {
+                    strategies.add(Strategy.HASH_BUILD_LEFT);
+                }
+                strategies.add(Strategy.SORT_MERGE);
+            }
+
+            return strategies;
+        }
+
+        /**
          * Returns a boolean vector indicating whether the evaluation of join expressions
          * can be evaluated at an early stage if the input JoinSpec can be taken as a
          * star join. Otherwise returns null.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 0688b94..e3ed110 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -251,7 +252,12 @@ public class ListJarsQueryPlan implements QueryPlan {
         return false;
     }
 
-	@Override
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
 	public Set<TableRef> getSourceRefs() {
 		return Collections.<TableRef>emptySet();
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3b14850..243f03e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -52,6 +53,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.EqualParseNode;
 import org.apache.phoenix.parse.HintNode.Hint;
@@ -63,7 +65,10 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PDatum;
@@ -102,9 +107,9 @@ public class QueryCompiler {
     private final ParallelIteratorFactory parallelIteratorFactory;
     private final SequenceManager sequenceManager;
     private final boolean projectTuples;
-    private final boolean useSortMergeJoin;
     private final boolean noChildParentJoinOptimization;
     private final QueryPlan dataPlan;
+    private final boolean costBased;
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
         this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
@@ -119,9 +124,10 @@ public class QueryCompiler {
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.sequenceManager = sequenceManager;
         this.projectTuples = projectTuples;
-        this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
         this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
-        if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+        ConnectionQueryServices services = statement.getConnection().getQueryServices();
+        this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+        if (services.getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
             this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
         }
         if (select.getHint().hasHint(Hint.NO_CACHE)) {
@@ -201,41 +207,17 @@ public class QueryCompiler {
         }
     }
 
-    /*
+    /**
      * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
-     * This matches the input JoinTable node against patterns in the following order:
-     * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
-     *    Returns the compilation result of a single table scan or of an independent subquery.
-     * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
-     *        1) A LEFT/INNER JOIN B
-     *        2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
-     *        where A can be a named table reference or a flat subquery, and B, C, ... can be a named
-     *        table reference, a sub-join or a subquery of any kind.
-     *    Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
-     * 3. Matching pattern:
-     *        A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
-     *        where B can be a named table reference or a flat subquery, and A can be a named table
-     *        reference, a sub-join or a subquery of any kind.
-     *    Returns a HashJoinPlan{scan: B, hash: A}.
-     *    NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
-     *    "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
-     *    parenthesis is considered a sub-join.
-     *    viewed as a sub-join.
-     * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
-     *    Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
-     *    of both sides as order-by clauses.
-     * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
-     *    
-     * If no join algorithm hint is provided, according to the above compilation process, a join query 
-     * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
-     * TODO 1) Use table statistics to guide the choice of join plans.
-     *      2) Make it possible to hint a certain join algorithm for a specific join step.
+     * If it is a leaf node, call compileSingleFlatQuery() or compileSubquery(), otherwise:
+     *      1) If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, return the
+     *         join plan with the best cost. Note that the "best" plan is only locally optimal,
+     *         and might or might not be globally optimal.
+     *      2) Otherwise, return the join plan compiled with the default strategy.
+     * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
      */
-    @SuppressWarnings("unchecked")
     protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
-        byte[] emptyByteArray = new byte[0];
-        List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
-        if (joinSpecs.isEmpty()) {
+        if (joinTable.getJoinSpecs().isEmpty()) {
             Table table = joinTable.getTable();
             SelectStatement subquery = table.getAsSubquery(orderBy);
             if (!table.isSubselect()) {
@@ -253,198 +235,229 @@ public class QueryCompiler {
             return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
         }
 
-        boolean[] starJoinVector;
-        if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
-            Table table = joinTable.getTable();
-            PTable initialProjectedTable;
-            TableRef tableRef;
-            SelectStatement query;
-            TupleProjector tupleProjector;
-            if (!table.isSubselect()) {
-                context.setCurrentTable(table.getTableRef());
-                initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
-                tableRef = table.getTableRef();
-                table.projectColumns(context.getScan());
-                query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
-                tupleProjector = new TupleProjector(initialProjectedTable);
-            } else {
-                SelectStatement subquery = table.getAsSubquery(orderBy);
-                QueryPlan plan = compileSubquery(subquery, false);
-                initialProjectedTable = table.createProjectedTable(plan.getProjector());
-                tableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-                tupleProjector = new TupleProjector(plan.getProjector());
+        List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies();
+        assert strategies.size() > 0;
+        if (!costBased || strategies.size() == 1) {
+            return compileJoinQuery(
+                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+        }
+
+        QueryPlan bestPlan = null;
+        Cost bestCost = null;
+        for (JoinCompiler.Strategy strategy : strategies) {
+            StatementContext newContext = new StatementContext(
+                    context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
+            QueryPlan plan = compileJoinQuery(
+                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
+            Cost cost = plan.getCost();
+            if (bestPlan == null || cost.compareTo(bestCost) < 0) {
+                bestPlan = plan;
+                bestCost = cost;
             }
-            context.setCurrentTable(tableRef);
-            PTable projectedTable = initialProjectedTable;
-            int count = joinSpecs.size();
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
-            List<Expression>[] joinExpressions = new List[count];
-            JoinType[] joinTypes = new JoinType[count];
-            PTable[] tables = new PTable[count];
-            int[] fieldPositions = new int[count];
-            StatementContext[] subContexts = new StatementContext[count];
-            QueryPlan[] subPlans = new QueryPlan[count];
-            HashSubPlan[] hashPlans = new HashSubPlan[count];
-            fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
-            for (int i = 0; i < count; i++) {
-                JoinSpec joinSpec = joinSpecs.get(i);
-                Scan subScan = ScanUtil.newScan(originalScan);
-                subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
-                boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
-                if (hasPostReference) {
-                    tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
-                    projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+        }
+        context.setResolver(bestPlan.getContext().getResolver());
+        context.setCurrentTable(bestPlan.getContext().getCurrentTable());
+        return bestPlan;
+    }
+
+    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
+        byte[] emptyByteArray = new byte[0];
+        List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
+        switch (strategy) {
+            case HASH_BUILD_RIGHT: {
+                boolean[] starJoinVector = joinTable.getStarJoinVector();
+                Table table = joinTable.getTable();
+                PTable initialProjectedTable;
+                TableRef tableRef;
+                SelectStatement query;
+                TupleProjector tupleProjector;
+                if (!table.isSubselect()) {
+                    context.setCurrentTable(table.getTableRef());
+                    initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
+                    tableRef = table.getTableRef();
+                    table.projectColumns(context.getScan());
+                    query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
+                    tupleProjector = new TupleProjector(initialProjectedTable);
                 } else {
-                    tables[i] = null;
+                    SelectStatement subquery = table.getAsSubquery(orderBy);
+                    QueryPlan plan = compileSubquery(subquery, false);
+                    initialProjectedTable = table.createProjectedTable(plan.getProjector());
+                    tableRef = plan.getTableRef();
+                    context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                    query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+                    tupleProjector = new TupleProjector(plan.getProjector());
                 }
-            }
-            for (int i = 0; i < count; i++) {
-                JoinSpec joinSpec = joinSpecs.get(i);
-                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
-                joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
-                joinExpressions[i] = joinConditions.getFirst();
-                List<Expression> hashExpressions = joinConditions.getSecond();
-                Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-                boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
-                Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
-                Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
-                joinTypes[i] = joinSpec.getType();
-                if (i < count - 1) {
-                    fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+                context.setCurrentTable(tableRef);
+                PTable projectedTable = initialProjectedTable;
+                int count = joinSpecs.size();
+                ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+                List<Expression>[] joinExpressions = new List[count];
+                JoinType[] joinTypes = new JoinType[count];
+                PTable[] tables = new PTable[count];
+                int[] fieldPositions = new int[count];
+                StatementContext[] subContexts = new StatementContext[count];
+                QueryPlan[] subPlans = new QueryPlan[count];
+                HashSubPlan[] hashPlans = new HashSubPlan[count];
+                fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
+                for (int i = 0; i < count; i++) {
+                    JoinSpec joinSpec = joinSpecs.get(i);
+                    Scan subScan = ScanUtil.newScan(originalScan);
+                    subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
+                    boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
+                    if (hasPostReference) {
+                        tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
+                        projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
+                    } else {
+                        tables[i] = null;
+                    }
+                }
+                for (int i = 0; i < count; i++) {
+                    JoinSpec joinSpec = joinSpecs.get(i);
+                    context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
+                    joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
+                    Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], strategy);
+                    joinExpressions[i] = joinConditions.getFirst();
+                    List<Expression> hashExpressions = joinConditions.getSecond();
+                    Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+                    boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+                    Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
+                    Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
+                    joinTypes[i] = joinSpec.getType();
+                    if (i < count - 1) {
+                        fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+                    }
+                    hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+                }
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
+                Integer limit = null;
+                Integer offset = null;
+                if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
+                    limit = plan.getLimit();
+                    offset = plan.getOffset();
                 }
-                hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+                HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
+                        starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+                return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
             }
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-            QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
-            Integer limit = null;
-            Integer offset = null;
-            if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
-                limit = plan.getLimit();
-                offset = plan.getOffset();
+            case HASH_BUILD_LEFT: {
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
+                Table rhsTable = rhsJoinTable.getTable();
+                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                Scan subScan = ScanUtil.newScan(originalScan);
+                StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+                PTable rhsProjTable;
+                TableRef rhsTableRef;
+                SelectStatement rhs;
+                TupleProjector tupleProjector;
+                if (!rhsTable.isSubselect()) {
+                    context.setCurrentTable(rhsTable.getTableRef());
+                    rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
+                    rhsTableRef = rhsTable.getTableRef();
+                    rhsTable.projectColumns(context.getScan());
+                    rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
+                    tupleProjector = new TupleProjector(rhsProjTable);
+                } else {
+                    SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
+                    QueryPlan plan = compileSubquery(subquery, false);
+                    rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
+                    rhsTableRef = plan.getTableRef();
+                    context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                    rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+                    tupleProjector = new TupleProjector(plan.getProjector());
+                }
+                context.setCurrentTable(rhsTableRef);
+                context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
+                ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[]{new ImmutableBytesPtr(emptyByteArray)};
+                Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, strategy);
+                List<Expression> joinExpressions = joinConditions.getSecond();
+                List<Expression> hashExpressions = joinConditions.getFirst();
+                boolean needsMerge = lhsJoin.hasPostReference();
+                PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
+                int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
+                PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
+                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
+                Integer limit = null;
+                Integer offset = null;
+                if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
+                    limit = rhsPlan.getLimit();
+                    offset = rhsPlan.getOffset();
+                }
+                HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
+                        new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
+                        new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+                Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
+                getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
+                return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
-                    starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
-            return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
-        }
+            case SORT_MERGE: {
+                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
+                JoinType type = lastJoinSpec.getType();
+                JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+                if (type == JoinType.Right) {
+                    JoinTable temp = lhsJoin;
+                    lhsJoin = rhsJoin;
+                    rhsJoin = temp;
+                }
 
-        JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
-        JoinType type = lastJoinSpec.getType();
-        if (!this.useSortMergeJoin 
-                && (type == JoinType.Right || type == JoinType.Inner) 
-                && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
-                && lastJoinSpec.getJoinTable().getTable().isFlat()) {
-            JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
-            Table rhsTable = rhsJoinTable.getTable();
-            JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
-            Scan subScan = ScanUtil.newScan(originalScan);
-            StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-            QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
-            PTable rhsProjTable;
-            TableRef rhsTableRef;
-            SelectStatement rhs;
-            TupleProjector tupleProjector;
-            if (!rhsTable.isSubselect()) {
-                context.setCurrentTable(rhsTable.getTableRef());
-                rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
-                rhsTableRef = rhsTable.getTableRef();
-                rhsTable.projectColumns(context.getScan());
-                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
-                tupleProjector = new TupleProjector(rhsProjTable);
-            } else {
-                SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
-                QueryPlan plan = compileSubquery(subquery, false);
-                rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
-                rhsTableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-                tupleProjector = new TupleProjector(plan.getProjector());
-            }
-            context.setCurrentTable(rhsTableRef);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
-            List<Expression> joinExpressions = joinConditions.getSecond();
-            List<Expression> hashExpressions = joinConditions.getFirst();
-            boolean needsMerge = lhsJoin.hasPostReference();
-            PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
-            int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
-            PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
-            QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
-            Integer limit = null;
-            Integer offset = null;
-            if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
-                limit = rhsPlan.getLimit();
-                offset = rhsPlan.getOffset();
-            }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions },
-                    new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true },
-                    new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression,  QueryUtil.getOffsetLimit(limit, offset));
-            Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-            getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
-            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
-        }
+                List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+                List<OrderByNode> lhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+                List<OrderByNode> rhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size());
+                for (EqualParseNode condition : joinConditionNodes) {
+                    lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+                    rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+                }
 
-        JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
-        JoinTable rhsJoin = lastJoinSpec.getJoinTable();        
-        if (type == JoinType.Right) {
-            JoinTable temp = lhsJoin;
-            lhsJoin = rhsJoin;
-            rhsJoin = temp;
-        }
-        
-        List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
-        List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
-        List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
-        for (EqualParseNode condition : joinConditionNodes) {
-            lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
-            rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
-        }
-        
-        Scan lhsScan = ScanUtil.newScan(originalScan);
-        StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
-        boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
-        QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
-        PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
-        boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
-        
-        Scan rhsScan = ScanUtil.newScan(originalScan);
-        StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
-        QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
-        PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
-        
-        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
-        List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
-        List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
-        
-        boolean needsMerge = rhsJoin.hasPostReference();
-        int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
-        PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
-
-        ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
-        TableRef tableRef = resolver.getTables().get(0);
-        StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
-        subCtx.setCurrentTable(tableRef);
-        QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
-        context.setCurrentTable(tableRef);
-        context.setResolver(resolver);
-        TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
-        ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery
-                ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
-                        Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false,
-                        joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(),
+                Scan lhsScan = ScanUtil.newScan(originalScan);
+                StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+                boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+                PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
+                boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+
+                Scan rhsScan = ScanUtil.newScan(originalScan);
+                StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+                PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
+
+                Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
+                List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+                List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+
+                boolean needsMerge = rhsJoin.hasPostReference();
+                int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
+                PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+                ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
+                TableRef tableRef = resolver.getTables().get(0);
+                StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+                subCtx.setCurrentTable(tableRef);
+                QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
+                context.setCurrentTable(tableRef);
+                context.setResolver(resolver);
+                TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+                ParseNode where = joinTable.getPostFiltersCombined();
+                SelectStatement select = asSubquery
+                        ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
+                        Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false,
+                        joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(),
                         joinTable.getStatement().getUdfParseNodes())
-                : NODE_FACTORY.select(joinTable.getStatement(), from, where);
-        
-        return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+                        : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+
+                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+            }
+            default:
+                throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
+        }
     }
 
     private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index ca88984..c2edaf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.optimize.Cost;
@@ -90,4 +91,5 @@ public interface QueryPlan extends StatementPlan {
      */
     public boolean useRoundRobinIterator() throws SQLException;
 
+    <T> T accept(QueryPlanVisitor<T> visitor);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2714858..02aadc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -35,6 +35,7 @@ import org.apache.htrace.Sampler;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -270,6 +271,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Long getEstimatedRowsToScan() {
         return 0l;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9461d0d6/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2e042e7..0c8e8dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -33,6 +33,10 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyExpression;
@@ -117,25 +121,39 @@ public class AggregatePlan extends BaseQueryPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
+        Double outputBytes = this.accept(new ByteCountVisitor());
+        Double rowWidth = this.accept(new AvgRowWidthVisitor());
+        Long inputRows = null;
         try {
-            byteCount = getEstimatedBytesToScan();
+            inputRows = getEstimatedRowsToScan();
         } catch (SQLException e) {
             // ignored.
         }
-
-        if (byteCount == null) {
+        if (inputRows == null || outputBytes == null || rowWidth == null) {
             return Cost.UNKNOWN;
         }
+        double inputBytes = inputRows * rowWidth;
+        double rowsBeforeHaving = RowCountVisitor.aggregate(
+                                    RowCountVisitor.filter(
+                                            inputRows.doubleValue(),
+                                            RowCountVisitor.stripSkipScanFilter(
+                                                    context.getScan().getFilter())),
+                                    groupBy);
+        double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+        double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+        double bytesAfterHaving = rowWidth * rowsAfterHaving;
 
         int parallelLevel = CostUtil.estimateParallelLevel(
                 true, context.getConnection().getQueryServices());
-        Cost cost = CostUtil.estimateAggregateCost(byteCount,
-                groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+        Cost cost = new Cost(0, 0, inputBytes);
+        Cost aggCost = CostUtil.estimateAggregateCost(
+                inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
+        cost = cost.plus(aggCost);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            double outputBytes = CostUtil.estimateAggregateOutputBytes(
-                    byteCount, groupBy, aggregators.getEstimatedByteSize());
-            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            parallelLevel = CostUtil.estimateParallelLevel(
+                    false, context.getConnection().getQueryServices());
+            Cost orderByCost = CostUtil.estimateOrderByCost(
+                    bytesAfterHaving, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return cost;
@@ -304,4 +322,9 @@ public class AggregatePlan extends BaseQueryPlan {
         return false;
     }
 
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
 }