You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/09/02 17:05:10 UTC

git commit: PHOENIX-852 Optimize child/parent foreign key joins

Repository: phoenix
Updated Branches:
  refs/heads/master c36973263 -> 27a6ccef3


PHOENIX-852 Optimize child/parent foreign key joins


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/27a6ccef
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/27a6ccef
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/27a6ccef

Branch: refs/heads/master
Commit: 27a6ccef31b3fa7fd4ca4a4e9cb9bc01ae498238
Parents: c369732
Author: maryannxue <ma...@apache.org>
Authored: Tue Sep 2 11:04:31 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Tue Sep 2 11:04:31 2014 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 263 ++++++++++++++++++-
 .../apache/phoenix/compile/JoinCompiler.java    |  20 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  43 ++-
 .../org/apache/phoenix/compile/ScanRanges.java  |   6 +-
 .../apache/phoenix/compile/WhereCompiler.java   |  33 ++-
 .../apache/phoenix/compile/WhereOptimizer.java  |  75 +++++-
 .../apache/phoenix/execute/HashJoinPlan.java    | 103 +++++++-
 .../apache/phoenix/join/HashCacheClient.java    |  14 +-
 .../java/org/apache/phoenix/parse/HintNode.java |   8 +
 .../apache/phoenix/schema/PArrayDataType.java   |  13 +
 .../org/apache/phoenix/schema/PDataType.java    | 252 +++++++++++++++++-
 .../apache/phoenix/schema/PDataTypeTest.java    |  21 +-
 12 files changed, 820 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 65ebaa6..05f2837 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -229,7 +229,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "            SERVER FILTER BY QUANTITY < 5000\n" +
                 "    BUILD HASH TABLE 1\n" +
-                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY item_id IN (O.item_id)",
                 /*
                  * testSelfJoin()
                  *     SELECT i2.item_id, i1.name FROM joinItemTable i1 
@@ -240,7 +241,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "    BUILD HASH TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
-                "            SERVER FILTER BY FIRST KEY ONLY",
+                "            SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (I2.item_id)",
                 /*
                  * testSelfJoin()
                  *     SELECT i1.name, i2.name FROM joinItemTable i1 
@@ -252,7 +254,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "CLIENT MERGE SORT\n" +
                 "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "    BUILD HASH TABLE 0\n" +
-                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME,
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (I2.supplier_id)",
                 /*
                  * testStarJoin()
                  *     SELECT order_id, c.name, i.name iname, quantity, o.date 
@@ -283,7 +286,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "            PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "            BUILD HASH TABLE 0\n" +
-                "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_DISPLAY_NAME,
+                "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (O.item_id)",
                 /*
                  * testSubJoin()
                  *     SELECT * FROM joinCustomerTable c 
@@ -310,7 +314,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    SERVER FILTER BY NAME != 'T3'\n" +
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
-                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)",
                 /* 
                  * testJoinWithSubqueryAndAggregation()
                  *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
@@ -439,7 +444,46 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
                 "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY supplier_id BETWEEN MIN/MAX OF (I.supplier_id)\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))",
                 }});
         testCases.add(new String[][] {
                 {
@@ -574,7 +618,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "    BUILD HASH TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
-                "            SERVER FILTER BY FIRST KEY ONLY",
+                "            SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (I2.:item_id)",
                 /*
                  * testSelfJoin()
                  *     SELECT i1.name, i2.name FROM joinItemTable i1 
@@ -647,7 +692,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    SERVER FILTER BY NAME != 'T3'\n" +
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
-                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)",
                 /* 
                  * testJoinWithSubqueryAndAggregation()
                  *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
@@ -777,7 +823,46 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
                 "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY supplier_id BETWEEN MIN/MAX OF (I.0:supplier_id)\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))",
                 }});
         testCases.add(new String[][] {
                 {
@@ -910,7 +995,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "            SERVER FILTER BY QUANTITY < 5000\n" +
                 "    BUILD HASH TABLE 1\n" +
                 "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + "" + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + " [-32768]\n" +
-                "        CLIENT MERGE SORT",
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY item_id IN (O.item_id)",
                 /*
                  * testSelfJoin()
                  *     SELECT i2.item_id, i1.name FROM joinItemTable i1 
@@ -922,7 +1008,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    BUILD HASH TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+ JOIN_ITEM_TABLE_DISPLAY_NAME +" [-32768]\n"  +
                 "            SERVER FILTER BY FIRST KEY ONLY\n" +
-                "        CLIENT MERGE SORT",
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (I2.:item_id)",
                 /*
                  * testSelfJoin()
                  *     SELECT i1.name, i2.name FROM joinItemTable i1 
@@ -936,7 +1023,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "    BUILD HASH TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+ JOIN_ITEM_TABLE_DISPLAY_NAME +" [-32768]\n" +
-                "        CLIENT MERGE SORT",
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (I2.0:supplier_id)",
                 /*
                  * testStarJoin()
                  *     SELECT order_id, c.name, i.name iname, quantity, o.date 
@@ -972,7 +1060,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "            PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "            BUILD HASH TABLE 0\n" +
                 "                CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + "" + JOIN_CUSTOMER_TABLE_DISPLAY_NAME+" [-32768]\n"+
-                "                CLIENT MERGE SORT",
+                "                CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY item_id BETWEEN MIN/MAX OF (O.item_id)",
                 /*
                  * testSubJoin()
                  *     SELECT * FROM joinCustomerTable c 
@@ -1000,7 +1089,8 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                CLIENT MERGE SORT\n" +
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
-                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)",
                 /* 
                  * testJoinWithSubqueryAndAggregation()
                  *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
@@ -1135,7 +1225,46 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT MERGE SORT\n" +
                 "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY supplier_id BETWEEN MIN/MAX OF (I.0:supplier_id)\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)",
+                /*
+                 * testJoinWithKeyRangeOptimization()
+                 *     SELECT (*SKIP_SCAN_HASH_JOIN*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 
+                 *     FROM TEMP_TABLE_COMPOSITE_PK lhs 
+                 *     JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2
+                 */
+                "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))",
                 }});
         return testCases;
     }
@@ -3645,6 +3774,116 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testJoinWithKeyRangeOptimization() throws Exception {
+        String tempTableWithCompositePK = "TEMP_TABLE_COMPOSITE_PK";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute("CREATE TABLE " + tempTableWithCompositePK 
+                    + "   (col0 INTEGER NOT NULL, " 
+                    + "    col1 INTEGER NOT NULL, " 
+                    + "    col2 INTEGER NOT NULL, "
+                    + "    col3 INTEGER "
+                    + "   CONSTRAINT pk PRIMARY KEY (col0, col1, col2)) " 
+                    + "   SALT_BUCKETS=4");
+            
+            PreparedStatement upsertStmt = conn.prepareStatement(
+                    "upsert into " + tempTableWithCompositePK + "(col0, col1, col2, col3) " + "values (?, ?, ?, ?)");
+            for (int i = 0; i < 3; i++) {
+                upsertStmt.setInt(1, i + 1);
+                upsertStmt.setInt(2, i + 2);
+                upsertStmt.setInt(3, i + 3);
+                upsertStmt.setInt(4, i + 5);
+                upsertStmt.execute();
+            }
+            conn.commit();
+            
+            // No leading part of PK
+            String query = "SELECT /*+ SKIP_SCAN_HASH_JOIN*/ lhs.col0, lhs.col1, lhs.col2, lhs.col3, rhs.col0, rhs.col1, rhs.col2, rhs.col3 FROM " 
+                    + tempTableWithCompositePK + " lhs JOIN "
+                    + tempTableWithCompositePK + " rhs ON lhs.col1 = rhs.col2";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 2);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 4);
+            assertEquals(rs.getInt(4), 6);            
+            assertEquals(rs.getInt(5), 1);
+            assertEquals(rs.getInt(6), 2);
+            assertEquals(rs.getInt(7), 3);
+            assertEquals(rs.getInt(8), 5);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 4);
+            assertEquals(rs.getInt(3), 5);
+            assertEquals(rs.getInt(4), 7);
+            assertEquals(rs.getInt(5), 2);
+            assertEquals(rs.getInt(6), 3);
+            assertEquals(rs.getInt(7), 4);
+            assertEquals(rs.getInt(8), 6);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals(plans[21], QueryUtil.getExplainPlan(rs));
+            
+            // Two parts of PK but only one leading part
+            query = "SELECT /*+ SKIP_SCAN_HASH_JOIN*/ lhs.col0, lhs.col1, lhs.col2, lhs.col3, rhs.col0, rhs.col1, rhs.col2, rhs.col3 FROM " 
+                    + tempTableWithCompositePK + " lhs JOIN "
+                    + tempTableWithCompositePK + " rhs ON lhs.col2 = rhs.col3 AND lhs.col0 = rhs.col2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 4);
+            assertEquals(rs.getInt(3), 5);
+            assertEquals(rs.getInt(4), 7);
+            assertEquals(rs.getInt(5), 1);
+            assertEquals(rs.getInt(6), 2);
+            assertEquals(rs.getInt(7), 3);
+            assertEquals(rs.getInt(8), 5);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals(plans[22], QueryUtil.getExplainPlan(rs));
+            
+            // Two leading parts of PK
+            query = "SELECT /*+ SKIP_SCAN_HASH_JOIN*/ lhs.col0, lhs.col1, lhs.col2, lhs.col3, rhs.col0, rhs.col1, rhs.col2, rhs.col3 FROM " 
+                    + tempTableWithCompositePK + " lhs JOIN "
+                    + tempTableWithCompositePK + " rhs ON lhs.col1 = rhs.col2 AND lhs.col0 = rhs.col1";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 2);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 4);
+            assertEquals(rs.getInt(4), 6);
+            assertEquals(rs.getInt(5), 1);
+            assertEquals(rs.getInt(6), 2);
+            assertEquals(rs.getInt(7), 3);
+            assertEquals(rs.getInt(8), 5);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 4);
+            assertEquals(rs.getInt(3), 5);
+            assertEquals(rs.getInt(4), 7);
+            assertEquals(rs.getInt(5), 2);
+            assertEquals(rs.getInt(6), 3);
+            assertEquals(rs.getInt(7), 4);
+            assertEquals(rs.getInt(8), 6);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals(plans[23], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 810e1cd..81d169c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -83,7 +83,6 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
@@ -421,6 +420,21 @@ public class JoinCompiler {
             
             return false;
         }
+        
+        public boolean hasFilters() {
+           if (!postFilters.isEmpty())
+               return true;
+           
+           if (!hasRightJoin && table.hasFilters())
+               return true;
+           
+           for (JoinTable joinTable : prefilterAcceptedTables) {
+               if (joinTable.hasFilters())
+                   return true;
+           }
+           
+           return false;
+        }
     }
     
     public static class JoinSpec {
@@ -664,6 +678,10 @@ public class JoinCompiler {
             return NODE_FACTORY.select(from, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence());
         }
         
+        public boolean hasFilters() {
+            return isSubselect() ? (!postFilters.isEmpty() || subselect.getWhere() != null || subselect.getHaving() != null) : !preFilters.isEmpty();
+        }
+        
         public boolean isFlat() {
             return subselect == null || JoinCompiler.isFlat(subselect);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index fcf3a18..6f9d688 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -58,6 +59,8 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ScanUtil;
 
+import com.google.common.collect.Lists;
+
 
 
 /**
@@ -179,6 +182,9 @@ public class QueryCompiler {
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
             List<Expression>[] joinExpressions = new List[count];
             List<Expression>[] hashExpressions = new List[count];
+            Expression[] keyRangeLhsExpressions = new Expression[count];
+            Expression[] keyRangeRhsExpressions = new Expression[count];
+            boolean[] hasFilters = new boolean[count];
             JoinType[] joinTypes = new JoinType[count];
             PTable[] tables = new PTable[count];
             int[] fieldPositions = new int[count];
@@ -211,6 +217,10 @@ public class QueryCompiler {
                 Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver);
                 joinExpressions[i] = joinConditions.getFirst();
                 hashExpressions[i] = joinConditions.getSecond();
+                Pair<Expression, Expression> keyRangeExpressions = getKeyExpressionCombinations(context, tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions[i]);
+                keyRangeLhsExpressions[i] = keyRangeExpressions.getFirst();
+                keyRangeRhsExpressions[i] = keyRangeExpressions.getSecond();
+                hasFilters[i] = joinSpec.getJoinTable().hasFilters();
                 joinTypes[i] = joinSpec.getType();
                 if (i < count - 1) {
                     fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
@@ -228,7 +238,7 @@ public class QueryCompiler {
                 limit = LimitCompiler.compile(context, query);
             }
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection);
-            return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, joinPlans, clientProjectors);
+            return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, keyRangeLhsExpressions, keyRangeRhsExpressions, joinPlans, clientProjectors, hasFilters);
         }
         
         JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
@@ -283,13 +293,42 @@ public class QueryCompiler {
                 limit = LimitCompiler.compile(context, rhs);
             }
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
-            return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector});
+            Pair<Expression, Expression> keyRangeExpressions = getKeyExpressionCombinations(context, rhsTableRef, type, joinExpressions, hashExpressions);
+            return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new Expression[] {keyRangeExpressions.getFirst()}, new Expression[] {keyRangeExpressions.getSecond()}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector}, new boolean[] {lhsJoin.hasFilters()});
         }
         
         // Do not support queries like "A right join B left join C" with hash-joins.
         throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
     }
     
+    private Pair<Expression, Expression> getKeyExpressionCombinations(StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
+        if (type != JoinType.Inner)
+            return new Pair<Expression, Expression>(null, null);
+        
+        Scan scanCopy = ScanUtil.newScan(context.getScan());
+        StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement));
+        contextCopy.setCurrentTable(table);
+        List<Expression> lhsCombination = WhereOptimizer.getKeyExpressionCombination(contextCopy, this.select, joinExpressions);
+        if (lhsCombination.isEmpty())
+            return new Pair<Expression, Expression>(null, null);
+        
+        List<Expression> rhsCombination = Lists.newArrayListWithExpectedSize(lhsCombination.size());
+        for (int i = 0; i < lhsCombination.size(); i++) {
+            Expression lhs = lhsCombination.get(i);
+            for (int j = 0; j < joinExpressions.size(); j++) {
+                if (lhs == joinExpressions.get(j)) {
+                    rhsCombination.add(hashExpressions.get(j));
+                    break;
+                }
+            }
+        }
+        
+        if (lhsCombination.size() == 1)
+            return new Pair<Expression, Expression>(lhsCombination.get(0), rhsCombination.get(0));
+        
+        return new Pair<Expression, Expression>(new RowValueConstructorExpression(lhsCombination, false), new RowValueConstructorExpression(rhsCombination, false));
+    }
+    
     protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException {
         ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection());
         subquery = StatementNormalizer.normalize(subquery, resolver);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 21ccff2..1052601 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -258,7 +258,11 @@ public class ScanRanges {
             return false;
         }
         return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
-   }
+    }
+    
+    public int getPkColumnSpan() {
+        return this == ScanRanges.NOTHING ? 0 : ScanUtil.calculateSlotSpan(ranges, slotSpan);
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index b9a53f8..7bcb6d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -88,6 +90,20 @@ public class WhereCompiler {
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
     public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere) throws SQLException {
+        return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), false);
+    }
+    
+    /**
+     * Optimize scan ranges by applying dynamically generated filter expressions.
+     * @param context the shared context during query compilation
+     * @param statement TODO
+     * @throws SQLException if mismatched types are found, bind value do not match binds,
+     * or invalid function arguments are encountered.
+     * @throws SQLFeatureNotSupportedException if an unsupported expression is encountered.
+     * @throws ColumnNotFoundException if column name could not be resolved
+     * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
+     */    
+    public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, boolean hashJoinOptimization) throws SQLException {
         Set<Expression> extractedNodes = Sets.<Expression>newHashSet();
         WhereExpressionCompiler whereCompiler = new WhereExpressionCompiler(context);
         ParseNode where = statement.getWhere();
@@ -103,9 +119,14 @@ public class WhereCompiler {
             Expression viewExpression = viewWhere.accept(viewWhereCompiler);
             expression = AndExpression.create(Lists.newArrayList(expression, viewExpression));
         }
+        if (!dynamicFilters.isEmpty()) {
+            List<Expression> filters = Lists.newArrayList(expression);
+            filters.addAll(dynamicFilters);
+            expression = AndExpression.create(filters);
+        }
         
         expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
-        setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily);
+        setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization);
 
         return expression;
     }
@@ -189,14 +210,14 @@ public class WhereCompiler {
      * @param context the shared context during query compilation
      * @param whereClause the final where clause expression.
      */
-    private static void setScanFilter(StatementContext context, FilterableStatement statement, Expression whereClause, boolean disambiguateWithFamily) {
-        Filter filter = null;
+    private static void setScanFilter(StatementContext context, FilterableStatement statement, Expression whereClause, boolean disambiguateWithFamily, boolean hashJoinOptimization) {
         Scan scan = context.getScan();
         assert scan.getFilter() == null;
 
         if (LiteralExpression.isFalse(whereClause)) {
             context.setScanRanges(ScanRanges.NOTHING);
-        } else if (whereClause != null && !LiteralExpression.isTrue(whereClause)) {
+        } else if (whereClause != null && !LiteralExpression.isTrue(whereClause) && !hashJoinOptimization) {
+            Filter filter = null;
             final Counter counter = new Counter();
             whereClause.accept(new KeyValueExpressionVisitor() {
 
@@ -230,11 +251,11 @@ public class WhereCompiler {
                 filter = disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter(whereClause) : new MultiCQKeyValueComparisonFilter(whereClause);
                 break;
             }
+            scan.setFilter(filter);
         }
 
-        scan.setFilter(filter);
         ScanRanges scanRanges = context.getScanRanges();
-        boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);
+        boolean forcedSkipScan =  statement.getHint().hasHint(Hint.SKIP_SCAN);
         boolean forcedRangeScan = statement.getHint().hasHint(Hint.RANGE_SCAN);
         if (forcedSkipScan || (scanRanges.useSkipScanFilter() && !forcedRangeScan)) {
             ScanUtil.andFilterAtBeginning(scan, scanRanges.getSkipScanFilter());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 2faebf0..635dbc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -99,7 +100,7 @@ public class WhereOptimizer {
     public static Expression pushKeyExpressionsToScan(StatementContext context, FilterableStatement statement,
             Expression whereClause, Set<Expression> extractNodes) {
         PName tenantId = context.getConnection().getTenantId();
-        PTable table = context.getResolver().getTables().get(0).getTable();
+        PTable table = context.getCurrentTable().getTable();
         if (whereClause == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null) {
             context.setScanRanges(ScanRanges.EVERYTHING);
             return whereClause;
@@ -298,6 +299,78 @@ public class WhereOptimizer {
             return whereClause.accept(new RemoveExtractedNodesVisitor(extractNodes));
         }
     }
+    
+    /**
+     * Get an optimal combination of key expressions for hash join key range optimization.
+     * @param context the temporary context to get scan ranges set by pushKeyExpressionsToScan()
+     * @param statement the statement being compiled
+     * @param expressions the join key expressions
+     * @return the optimal list of key expressions
+     */
+    public static List<Expression> getKeyExpressionCombination(StatementContext context, FilterableStatement statement, List<Expression> expressions) throws SQLException {
+        List<Integer> candidateIndexes = Lists.newArrayList();
+        final List<Integer> pkPositions = Lists.newArrayList();
+        for (int i = 0; i < expressions.size(); i++) {
+            KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, context.getCurrentTable().getTable());
+            KeyExpressionVisitor.KeySlots keySlots = expressions.get(i).accept(visitor);
+            int minPkPos = Integer.MAX_VALUE; 
+            if (keySlots != null) {
+                Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator();
+                while (iterator.hasNext()) {
+                    KeyExpressionVisitor.KeySlot slot = iterator.next();
+                    if (slot.getPKPosition() < minPkPos) {
+                        minPkPos = slot.getPKPosition();
+                    }
+                }
+                if (minPkPos != Integer.MAX_VALUE) {
+                    candidateIndexes.add(i);
+                }
+            }
+            pkPositions.add(minPkPos);
+        }
+        
+        if (candidateIndexes.isEmpty())
+            return Collections.<Expression> emptyList();
+        
+        Collections.sort(candidateIndexes, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer left, Integer right) {
+                return pkPositions.get(left) - pkPositions.get(right);
+            }
+        });
+        
+        List<Expression> candidates = Lists.newArrayList();
+        List<List<Expression>> sampleValues = Lists.newArrayList();
+        for (Integer index : candidateIndexes) {
+            candidates.add(expressions.get(index));
+        }        
+        for (int i = 0; i < 2; i++) {
+            List<Expression> group = Lists.newArrayList();
+            for (Expression expression : candidates) {
+                PDataType type = expression.getDataType();
+                group.add(LiteralExpression.newConstant(type.getSampleValue(), type));
+            }
+            sampleValues.add(group);
+        }
+        
+        int count = 0;
+        int maxPkSpan = 0;
+        while (count < candidates.size()) {
+            Expression lhs = count == 0 ? candidates.get(0) : new RowValueConstructorExpression(candidates.subList(0, count + 1), false);
+            Expression firstRhs = count == 0 ? sampleValues.get(0).get(0) : new RowValueConstructorExpression(sampleValues.get(0).subList(0, count + 1), true);
+            Expression secondRhs = count == 0 ? sampleValues.get(1).get(0) : new RowValueConstructorExpression(sampleValues.get(1).subList(0, count + 1), true);
+            Expression testExpression = InListExpression.create(Lists.newArrayList(lhs, firstRhs, secondRhs), false, context.getTempPtr());
+            pushKeyExpressionsToScan(context, statement, testExpression);
+            int pkSpan = context.getScanRanges().getPkColumnSpan();
+            if (pkSpan <= maxPkSpan) {
+                break;
+            }
+            maxPkSpan = pkSpan;
+            count++;
+        }
+        
+        return candidates.subList(0, count);
+    }
 
     private static class RemoveExtractedNodesVisitor extends TraverseNoExpressionVisitor<Expression> {
         private final Set<Expression> nodesToRemove;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index f027c71..2cf89fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -29,15 +29,23 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.expression.AndExpression;
+import org.apache.phoenix.expression.ComparisonExpression;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.InListExpression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -45,10 +53,16 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
@@ -62,19 +76,30 @@ public class HashJoinPlan implements QueryPlan {
     private final BaseQueryPlan plan;
     private final HashJoinInfo joinInfo;
     private final List<Expression>[] hashExpressions;
+    private final Expression[] keyRangeLhsExpressions;
+    private final Expression[] keyRangeRhsExpressions;
     private final QueryPlan[] hashPlans;
     private final TupleProjector[] clientProjectors;
+    private final boolean[] hasFilters;
+    private final boolean forceHashJoinRangeScan;
+    private final boolean forceHashJoinSkipScan;
 
     public HashJoinPlan(FilterableStatement statement, 
             BaseQueryPlan plan, HashJoinInfo joinInfo,
-            List<Expression>[] hashExpressions, QueryPlan[] hashPlans, 
-            TupleProjector[] clientProjectors) {
+            List<Expression>[] hashExpressions, Expression[] keyRangeLhsExpressions,
+            Expression[] keyRangeRhsExpressions, QueryPlan[] hashPlans, 
+            TupleProjector[] clientProjectors, boolean[] hasFilters) {
         this.statement = statement;
         this.plan = plan;
         this.joinInfo = joinInfo;
         this.hashExpressions = hashExpressions;
+        this.keyRangeLhsExpressions = keyRangeLhsExpressions;
+        this.keyRangeRhsExpressions = keyRangeRhsExpressions;
         this.hashPlans = hashPlans;
         this.clientProjectors = clientProjectors;
+        this.hasFilters = hasFilters;
+        this.forceHashJoinRangeScan = plan.getStatement().getHint().hasHint(Hint.RANGE_SCAN_HASH_JOIN);
+        this.forceHashJoinSkipScan = plan.getStatement().getHint().hasHint(Hint.SKIP_SCAN_HASH_JOIN);
     }
 
     @Override
@@ -106,18 +131,24 @@ public class HashJoinPlan implements QueryPlan {
         ExecutorService executor = services.getExecutor();
         List<Future<ServerCache>> futures = new ArrayList<Future<ServerCache>>(count);
         List<SQLCloseable> dependencies = new ArrayList<SQLCloseable>(count);
+        List<Expression> keyRangeExpressions = new ArrayList<Expression>();
+        @SuppressWarnings("unchecked")
+        final List<ImmutableBytesWritable>[] keyRangeRhsValues = new List[count];  
         final int maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
         final AtomicLong firstJobEndTime = new AtomicLong(0);
         SQLException firstException = null;
         for (int i = 0; i < count; i++) {
             final int index = i;
+            if (keyRangeRhsExpressions[index] != null) {
+                keyRangeRhsValues[index] = new ArrayList<ImmutableBytesWritable>();
+            }
             futures.add(executor.submit(new JobCallable<ServerCache>() {
 
                 @Override
                 public ServerCache call() throws Exception {
                     QueryPlan hashPlan = hashPlans[index];
                     ServerCache cache = hashClient.addHashCache(ranges, hashPlan.iterator(), 
-                            clientProjectors[index], hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef());
+                            clientProjectors[index], hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef(), keyRangeRhsExpressions[index], keyRangeRhsValues[index]);
                     long endTime = System.currentTimeMillis();
                     boolean isSet = firstJobEndTime.compareAndSet(0, endTime);
                     if (!isSet && (endTime - firstJobEndTime.get()) > maxServerCacheTimeToLive) {
@@ -137,6 +168,9 @@ public class HashJoinPlan implements QueryPlan {
                 ServerCache cache = futures.get(i).get();
                 joinIds[i].set(cache.getId());
                 dependencies.add(cache);
+                if (keyRangeRhsExpressions[i] != null) {
+                    keyRangeExpressions.add(createKeyRangeExpression(keyRangeLhsExpressions[i], keyRangeRhsExpressions[i], keyRangeRhsValues[i], plan.getContext().getTempPtr(), hasFilters[i]));
+                }
             } catch (InterruptedException e) {
                 if (firstException == null) {
                     firstException = new SQLException("Hash plan [" + i + "] execution interrupted.", e);
@@ -154,10 +188,56 @@ public class HashJoinPlan implements QueryPlan {
         }
 
         HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
+        if (!keyRangeExpressions.isEmpty()) {
+            StatementContext context = plan.getContext();
+            PTable table = context.getCurrentTable().getTable();
+            ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
+            context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (plan.getStatement()), plan.getContext().getConnection()));
+            WhereCompiler.compile(plan.getContext(), plan.getStatement(), viewWhere, keyRangeExpressions, true);
+        }
 
         return plan.iterator(dependencies);
     }
 
+    private Expression createKeyRangeExpression(Expression lhsExpression,
+            Expression rhsExpression, List<ImmutableBytesWritable> rhsValues, 
+            ImmutableBytesWritable ptr, boolean hasFilters) throws SQLException {
+        if (rhsValues.isEmpty())
+            return LiteralExpression.newConstant(null, PDataType.BOOLEAN, true);
+        
+        PDataType type = rhsExpression.getDataType();
+        if (!useInClause(hasFilters)) {
+            ImmutableBytesWritable minValue = rhsValues.get(0);
+            ImmutableBytesWritable maxValue = rhsValues.get(0);
+            for (ImmutableBytesWritable value : rhsValues) {
+                if (value.compareTo(minValue) < 0) {
+                    minValue = value;
+                }
+                if (value.compareTo(maxValue) > 0) {
+                    maxValue = value;
+                }
+            }
+            
+            if (minValue.equals(maxValue))
+                return ComparisonExpression.create(CompareOp.EQUAL, Lists.newArrayList(lhsExpression, LiteralExpression.newConstant(type.toObject(minValue), type)), ptr);
+            
+            return AndExpression.create(Lists.newArrayList(
+                    ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, Lists.newArrayList(lhsExpression, LiteralExpression.newConstant(type.toObject(minValue), type)), ptr), 
+                    ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, Lists.newArrayList(lhsExpression, LiteralExpression.newConstant(type.toObject(maxValue), type)), ptr)));
+        }
+        
+        List<Expression> children = Lists.newArrayList(lhsExpression);
+        for (ImmutableBytesWritable value : rhsValues) {
+            children.add(LiteralExpression.newConstant(type.toObject(value), type));
+        }
+        
+        return InListExpression.create(children, false, ptr);
+    }
+    
+    private boolean useInClause(boolean hasFilters) {
+        return this.forceHashJoinSkipScan || (!this.forceHashJoinRangeScan && hasFilters);
+    }
+
     @Override
     public long getEstimatedSize() {
         return plan.getEstimatedSize();
@@ -183,6 +263,23 @@ public class HashJoinPlan implements QueryPlan {
                 planSteps.add("        " + step);
             }
         }
+        String dynamicFilters = null;
+        int filterCount = 0;
+        for (int i = 0; i < count; i++) {
+            if (keyRangeLhsExpressions[i] != null) {
+                if (filterCount == 1) {
+                    dynamicFilters = "(" + dynamicFilters + ")";
+                }
+                String filter = keyRangeLhsExpressions[i].toString() 
+                        + (useInClause(hasFilters[i]) ? " IN " : " BETWEEN MIN/MAX OF ") 
+                        + "(" + keyRangeRhsExpressions[i].toString() + ")";
+                dynamicFilters = dynamicFilters == null ? filter : (dynamicFilters + " AND (" + filter + ")");
+                filterCount++;
+            }
+        }
+        if (dynamicFilters != null) {
+            planSteps.add("    DYNAMIC SERVER FILTER BY " + dynamicFilters);
+        }
         if (joinInfo.getPostJoinFilterExpression() != null) {
             planSteps.add("    AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 909e772..863b535 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef) throws SQLException {
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        serialize(ptr, iterator, projector, estimatedSize, onExpressions);
+        serialize(ptr, iterator, projector, estimatedSize, onExpressions, keyRangeRhsExpression, keyRangeRhsValues);
         return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
     }
     
-    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions) throws SQLException {
+    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
         estimatedSize = Math.min(estimatedSize, maxSize);
         if (estimatedSize > Integer.MAX_VALUE) {
@@ -105,6 +105,14 @@ public class HashCacheClient  {
                 if (baOut.size() > maxSize) {
                     throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");
                 }
+                // Evaluate key expressions for hash join key range optimization.
+                if (keyRangeRhsExpression != null) {
+                    ImmutableBytesWritable value = new ImmutableBytesWritable();
+                    keyRangeRhsExpression.reset();
+                    if (keyRangeRhsExpression.evaluate(result, value)) {
+                        keyRangeRhsValues.add(value);
+                    }
+                }
                 nRows++;
             }
             TrustedByteArrayOutputStream sizeOut = new TrustedByteArrayOutputStream(Bytes.SIZEOF_INT);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index 0d2ede9..0ded0b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -47,6 +47,14 @@ public class HintNode {
          */
         SKIP_SCAN,
         /**
+         * Forces a range scan when full or partial primary key is used as join keys.
+         */
+        RANGE_SCAN_HASH_JOIN,
+        /**
+         * Forces a skip scan when full or partial primary key is used as join keys.
+         */
+        SKIP_SCAN_HASH_JOIN,
+        /**
          * Prevents the spawning of multiple threads during
          * query processing.
          */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/schema/PArrayDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PArrayDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PArrayDataType.java
index 6dbf017..8d96b87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PArrayDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PArrayDataType.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 
 /**
  * The datatype for PColummns that are Arrays. Any variable length array would follow the below order. 
@@ -643,5 +644,17 @@ public class PArrayDataType {
         }
         
     }
+    
+    public Object getSampleValue(PDataType baseType, Integer arrayLength, Integer elemLength) {
+        Preconditions.checkArgument(arrayLength == null || arrayLength >= 0);
+        if (arrayLength == null) {
+            arrayLength = 1;
+        }
+        Object[] array = new Object[arrayLength];
+        for (int i = 0; i < arrayLength; i++) {
+            array[i] = baseType.getSampleValue(elemLength, arrayLength);
+        }
+        return instantiatePhoenixArray(baseType, array);
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
index 3d38d64..614eb6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
@@ -28,6 +28,7 @@ import java.sql.Types;
 import java.text.Format;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Base64;
@@ -175,6 +176,20 @@ public enum PDataType {
             }
             return "'" + Bytes.toStringBinary(b, offset, length) + "'";
         }
+
+        private char[] sampleChars = new char[1];
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            Preconditions.checkArgument(maxLength == null || maxLength >= 0);
+            int length = maxLength != null ? maxLength : 1;
+            if (length != sampleChars.length) {
+                sampleChars = new char[length];
+            }
+            for (int i = 0; i < length; i++) {
+                sampleChars[i] = (char) RANDOM.get().nextInt(Byte.MAX_VALUE);
+            }
+            return new String(sampleChars);
+        }
     },
     /**
      * Fixed length single byte characters
@@ -339,6 +354,11 @@ public enum PDataType {
         public String toStringLiteral(byte[] b, int offset, int length, Format formatter) {
             return VARCHAR.toStringLiteral(b, offset, length, formatter);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return VARCHAR.getSampleValue(maxLength, arrayLength);
+        }
     },
     LONG("BIGINT", Types.BIGINT, Long.class, new LongCodec()) {
         @Override
@@ -534,6 +554,11 @@ public enum PDataType {
                 throw new IllegalDataException(e);
             }
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return RANDOM.get().nextLong();
+        }
     },
     INTEGER("INTEGER", Types.INTEGER, Integer.class, new IntCodec()) {
         @Override
@@ -667,6 +692,11 @@ public enum PDataType {
                 throw new IllegalDataException(e);
             }
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return RANDOM.get().nextInt();
+        }
     },
     SMALLINT("SMALLINT", Types.SMALLINT, Short.class, new ShortCodec()){
         @Override
@@ -795,7 +825,12 @@ public enum PDataType {
       public boolean isCoercibleTo(PDataType targetType) {
           return this == targetType || INTEGER.isCoercibleTo(targetType);
       }
-      
+
+      @Override
+      public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+          return ((Integer) INTEGER.getSampleValue(maxLength, arrayLength)).shortValue();
+      }
+
     },
     TINYINT("TINYINT", Types.TINYINT, Byte.class, new ByteCodec()) {
         @Override
@@ -920,6 +955,11 @@ public enum PDataType {
           return this == targetType || SMALLINT.isCoercibleTo(targetType);
       }
       
+      @Override
+      public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+          return ((Integer) INTEGER.getSampleValue(maxLength, arrayLength)).byteValue();
+      }
+
     },
     FLOAT("FLOAT", Types.FLOAT, Float.class, new FloatCodec()) {
 
@@ -1092,6 +1132,11 @@ public enum PDataType {
         public boolean isCoercibleTo(PDataType targetType) {
             return this == targetType || DOUBLE.isCoercibleTo(targetType);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return RANDOM.get().nextFloat();
+        }
     },
     DOUBLE("DOUBLE", Types.DOUBLE, Double.class, new DoubleCodec()) {
 
@@ -1269,6 +1314,11 @@ public enum PDataType {
             return this == targetType || targetType == DECIMAL
                     || targetType == VARBINARY || targetType == BINARY;
         }
+        
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return RANDOM.get().nextDouble();
+        }
     },
     DECIMAL("DECIMAL", Types.DECIMAL, BigDecimal.class, null) {
         @Override
@@ -1631,6 +1681,11 @@ public enum PDataType {
             }
             return super.toStringLiteral(b,offset, length, formatter);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new BigDecimal((Long) LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     TIMESTAMP("TIMESTAMP", Types.TIMESTAMP, Timestamp.class, new DateCodec()) {
 
@@ -1803,6 +1858,11 @@ public enum PDataType {
             long millis = PDataType.LONG.getCodec().decodeLong(ptr.get(),ptr.getOffset(), sortOrder);
             return millis;
         }
+        
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Timestamp((Long) LONG.getSampleValue(maxLength, arrayLength));
+        }
 
     },
     TIME("TIME", Types.TIME, Time.class, new DateCodec()) {
@@ -1913,6 +1973,11 @@ public enum PDataType {
             // TODO: different default formatter for TIME?
             return DATE.toStringLiteral(b, offset, length, formatter);
         }
+        
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Time((Long) LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     DATE("DATE", Types.DATE, Date.class, new DateCodec()) { // After TIMESTAMP and DATE to ensure toLiteral finds those first
 
@@ -2060,6 +2125,11 @@ public enum PDataType {
             }
             super.coerceBytes(ptr, object, actualType, maxLength, scale, actualModifier, desiredMaxLength, desiredScale, expectedModifier);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Date((Long) LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     UNSIGNED_TIMESTAMP("UNSIGNED_TIMESTAMP", 19, Timestamp.class, null) {
 
@@ -2166,6 +2236,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.TIMESTAMP;
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Timestamp((Long) UNSIGNED_LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     UNSIGNED_TIME("UNSIGNED_TIME", 18, Time.class, new UnsignedDateCodec()) {
 
@@ -2242,6 +2317,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.TIME;
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Time((Long) UNSIGNED_LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     UNSIGNED_DATE("UNSIGNED_DATE", 19, Date.class, new UnsignedDateCodec()) { // After TIMESTAMP and DATE to ensure toLiteral finds those first
 
@@ -2345,6 +2425,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.DATE;
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return new Date((Long) UNSIGNED_LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     /**
      * Unsigned long type that restricts values to be from 0 to {@link java.lang.Long#MAX_VALUE} inclusive. May be used to map to existing HTable values created through {@link org.apache.hadoop.hbase.util.Bytes#toBytes(long)}
@@ -2441,6 +2526,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return LONG.getResultSetSqlType();
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return Math.abs((Long) LONG.getSampleValue(maxLength, arrayLength));
+        }
     },
     /**
      * Unsigned integer type that restricts values to be from 0 to {@link java.lang.Integer#MAX_VALUE} inclusive. May be used to map to existing HTable values created through {@link org.apache.hadoop.hbase.util.Bytes#toBytes(int)}
@@ -2532,6 +2622,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return INTEGER.getResultSetSqlType();
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return Math.abs((Integer) INTEGER.getSampleValue(maxLength, arrayLength));
+        }
     },
     UNSIGNED_SMALLINT("UNSIGNED_SMALLINT", 13, Short.class, new UnsignedShortCodec()) {
         @Override
@@ -2626,6 +2721,11 @@ public enum PDataType {
       public int getResultSetSqlType() {
           return SMALLINT.getResultSetSqlType();
       }
+
+      @Override
+      public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+          return ((Integer) RANDOM.get().nextInt(Short.MAX_VALUE)).shortValue();
+      }
     },
     UNSIGNED_TINYINT("UNSIGNED_TINYINT", 11, Byte.class, new UnsignedByteCodec()) {
         @Override
@@ -2718,6 +2818,11 @@ public enum PDataType {
       public int getResultSetSqlType() {
           return TINYINT.getResultSetSqlType();
       }
+
+      @Override
+      public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+          return ((Integer) RANDOM.get().nextInt(Byte.MAX_VALUE)).byteValue();
+      }
     },
     UNSIGNED_FLOAT("UNSIGNED_FLOAT", 14, Float.class, new UnsignedFloatCodec()) {
 
@@ -2808,6 +2913,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return FLOAT.getResultSetSqlType();
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return Math.abs((Float) FLOAT.getSampleValue(maxLength, arrayLength));
+        }
     },
     UNSIGNED_DOUBLE("UNSIGNED_DOUBLE", 15, Double.class, new UnsignedDoubleCodec()) {
 
@@ -2901,6 +3011,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return  DOUBLE.getResultSetSqlType();
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return Math.abs((Double) DOUBLE.getSampleValue(maxLength, arrayLength));
+        }
     },
     BOOLEAN("BOOLEAN", Types.BOOLEAN, Boolean.class, null) {
 
@@ -2999,6 +3114,11 @@ public enum PDataType {
             }
             return throwConstraintViolationException(actualType,this);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return RANDOM.get().nextBoolean();
+        }
     },
     VARBINARY("VARBINARY", Types.VARBINARY, byte[].class, null) {
         @Override
@@ -3124,6 +3244,14 @@ public enum PDataType {
             buf.setCharAt(buf.length()-1, ']');
             return buf.toString();
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            int length = maxLength != null && maxLength > 0 ? maxLength : 1;
+            byte[] b = new byte[length];
+            RANDOM.get().nextBytes(b);
+            return b;
+        }
     },
     BINARY("BINARY", Types.BINARY, byte[].class, null) {
         @Override
@@ -3268,6 +3396,11 @@ public enum PDataType {
             }
             return VARBINARY.toStringLiteral(b, offset, length, formatter);
         }
+
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return VARBINARY.getSampleValue(maxLength, arrayLength);
+        }
     },
     INTEGER_ARRAY("INTEGER_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.INTEGER.getSqlType(), PhoenixArray.class, null) {
     	@Override
@@ -3351,6 +3484,11 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.INTEGER, arrayLength, maxLength);
+        }
 		
 	},
     BOOLEAN_ARRAY("BOOLEAN_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.BOOLEAN.getSqlType(), PhoenixArray.class, null) {
@@ -3435,6 +3573,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.BOOLEAN, arrayLength, maxLength);
+        }
 		
 	},
 	VARCHAR_ARRAY("VARCHAR_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.VARCHAR.getSqlType(), PhoenixArray.class, null) {
@@ -3525,6 +3667,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.VARCHAR, arrayLength, maxLength);
+        }
 		
 	},
 	VARBINARY_ARRAY("VARBINARY_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.VARBINARY.getSqlType(), PhoenixArray.class, null) {
@@ -3616,6 +3762,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.VARBINARY, arrayLength, maxLength);
+        }
 	},
 	BINARY_ARRAY("BINARY_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.BINARY.getSqlType(), PhoenixArray.class, null) {
 		@Override
@@ -3706,6 +3856,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.BINARY, arrayLength, maxLength);
+        }
     },
 	CHAR_ARRAY("CHAR_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.CHAR.getSqlType(), PhoenixArray.class, null) {
 		@Override
@@ -3796,6 +3950,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.CHAR, arrayLength, maxLength);
+        }
 		
 	},
 	LONG_ARRAY("LONG_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.LONG.getSqlType(), PhoenixArray.class, null) {
@@ -3879,6 +4037,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.LONG, arrayLength, maxLength);
+        }
 		
 	},
 	SMALLINT_ARRAY("SMALLINT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.SMALLINT.getSqlType(), PhoenixArray.class, null) {
@@ -3962,6 +4124,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.SMALLINT, arrayLength, maxLength);
+        }
 		
 	},
 	TINYINT_ARRAY("TINYINT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.TINYINT.getSqlType(), PhoenixArray.class, null) {
@@ -4045,6 +4211,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.TINYINT, arrayLength, maxLength);
+        }
 		
 	},
 	FLOAT_ARRAY("FLOAT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.FLOAT.getSqlType(), PhoenixArray.class, null) {
@@ -4129,6 +4299,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.FLOAT, arrayLength, maxLength);
+        }
 		
 	},
 	DOUBLE_ARRAY("DOUBLE_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.DOUBLE.getSqlType(), PhoenixArray.class, null) {
@@ -4213,6 +4387,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.DOUBLE, arrayLength, maxLength);
+        }
 
 	},
 	
@@ -4305,6 +4483,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.DECIMAL, arrayLength, maxLength);
+        }
 	
 	},
 	TIMESTAMP_ARRAY("TIMESTAMP_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.TIMESTAMP.getSqlType(), PhoenixArray.class,
@@ -4389,6 +4571,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.TIMESTAMP, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_TIMESTAMP_ARRAY("UNSIGNED_TIMESTAMP_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_TIMESTAMP.getSqlType(), PhoenixArray.class,
@@ -4473,6 +4659,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_TIMESTAMP, arrayLength, maxLength);
+        }
 
 	},
 	TIME_ARRAY("TIME_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.TIME.getSqlType(), PhoenixArray.class, null) {
@@ -4556,6 +4746,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.TIME, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_TIME_ARRAY("UNSIGNED_TIME_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_TIME.getSqlType(), PhoenixArray.class, null) {
@@ -4639,6 +4833,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_TIME, arrayLength, maxLength);
+        }
 
 	},
 	DATE_ARRAY("DATE_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.DATE.getSqlType(), PhoenixArray.class, null) {
@@ -4722,6 +4920,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.DATE, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_DATE_ARRAY("UNSIGNED_DATE_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_DATE.getSqlType(), PhoenixArray.class, null) {
@@ -4805,6 +5007,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_DATE, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_LONG_ARRAY("UNSIGNED_LONG_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_LONG.getSqlType(), PhoenixArray.class, null) {
@@ -4888,6 +5094,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_LONG, arrayLength, maxLength);
+        }
 	
 	},
 	UNSIGNED_INT_ARRAY("UNSIGNED_INT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_INT.getSqlType(), PhoenixArray.class, null) {
@@ -4971,6 +5181,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_INT, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_SMALLINT_ARRAY("UNSIGNED_SMALLINT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_SMALLINT.getSqlType(),
@@ -5055,6 +5269,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_SMALLINT, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_TINYINT_ARRAY("UNSIGNED_TINYINT__ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_TINYINT.getSqlType(), PhoenixArray.class,
@@ -5139,6 +5357,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_TINYINT, arrayLength, maxLength);
+        }
 	},
 	UNSIGNED_FLOAT_ARRAY("UNSIGNED_FLOAT_ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_FLOAT.getSqlType(), PhoenixArray.class, null) {
 		@Override
@@ -5221,6 +5443,10 @@ public enum PDataType {
             pDataTypeForArray.coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
                     this, actualModifer, desiredModifier);
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_FLOAT, arrayLength, maxLength);
+        }
 
 	},
 	UNSIGNED_DOUBLE_ARRAY("UNSIGNED_DOUBLE__ARRAY", PDataType.ARRAY_TYPE_BASE + PDataType.UNSIGNED_DOUBLE.getSqlType(), PhoenixArray.class,
@@ -5306,6 +5532,10 @@ public enum PDataType {
         public int getResultSetSqlType() {
             return Types.ARRAY;
         }
+        @Override
+        public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+            return pDataTypeForArray.getSampleValue(PDataType.UNSIGNED_DOUBLE, arrayLength, maxLength);
+        }
 		
 	};
 
@@ -6538,6 +6768,13 @@ public enum PDataType {
 
     public static final int ARRAY_TYPE_BASE = 3000;
     
+    private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>(){
+        @Override 
+        protected Random initialValue() {
+            return new Random();
+        }
+    };
+    
     /**
      * Serialize a BigDecimal into a variable length byte array in such a way that it is
      * binary comparable.
@@ -6909,6 +7146,19 @@ public enum PDataType {
      * Each enum must override this to define the set of objects it may create
      */
     public abstract Object toObject(byte[] bytes, int offset, int length, PDataType actualType, SortOrder sortOrder, Integer maxLength, Integer scale);
+
+    /*
+     * Return a valid object of this enum type
+     */
+    public abstract Object getSampleValue(Integer maxLength, Integer arrayLength);
+    
+    public final Object getSampleValue() {
+        return getSampleValue(null);
+    }
+    
+    public final Object getSampleValue(Integer maxLength) {
+        return getSampleValue(maxLength, null);
+    }
     
     public final Object toObject(byte[] bytes, int offset, int length, PDataType actualType, SortOrder sortOrder) {
         return toObject(bytes, offset, length, actualType, sortOrder, null, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/27a6ccef/phoenix-core/src/test/java/org/apache/phoenix/schema/PDataTypeTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/PDataTypeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/PDataTypeTest.java
index e952d8b..7e9efcd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/PDataTypeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/PDataTypeTest.java
@@ -1644,6 +1644,25 @@ public class PDataTypeTest {
             startWith = next;
         }
     }
+    
+    @Test
+    public void testGetSampleValue() {
+        PDataType[] types = PDataType.values();
+        // Test validity of 10 sample values for each type
+        for (int i = 0; i < 10; i++) {
+            for (PDataType type : types) {
+                Integer maxLength = 
+                        (type == PDataType.CHAR 
+                        || type == PDataType.BINARY 
+                        || type == PDataType.CHAR_ARRAY 
+                        || type == PDataType.BINARY_ARRAY) ? 10 : null;
+                int arrayLength = 10;
+                Object sampleValue = type.getSampleValue(maxLength, arrayLength);
+                byte[] b = type.toBytes(sampleValue);
+                type.toObject(b, 0, b.length, type, SortOrder.getDefault(), maxLength, null);
+            }
+        }
+    }
 
     // Simulate what an HBase Increment does with the value encoded as a long
     private long nextValueFor(long startWith, long incrementBy) {
@@ -1652,4 +1671,4 @@ public class PDataTypeTest {
         return (Long)PDataType.LONG.toObject(Bytes.toBytes(hstartWith));
     }
     
-}
\ No newline at end of file
+}