You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/09/09 08:02:06 UTC

[2/2] phoenix git commit: PHOENIX-4010 Hash Join cache may not be send to all regionservers when we have stale HBase meta cache

PHOENIX-4010 Hash Join cache may not be send to all regionservers when we have stale HBase meta cache


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7865a59b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7865a59b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7865a59b

Branch: refs/heads/master
Commit: 7865a59b0bbc4ea5e8ab775b920f5f608115707b
Parents: 44c0034
Author: Ankit Singhal <an...@gmail.com>
Authored: Sat Sep 9 13:31:55 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Sat Sep 9 13:31:55 2017 +0530

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/BaseJoinIT.java  |  14 +
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 452 +++++++++++++++++++
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 122 +++--
 .../end2end/index/PartialIndexRebuilderIT.java  |  23 +-
 .../DelayedTableResultIteratorFactory.java      |  12 +-
 .../apache/phoenix/cache/ServerCacheClient.java | 338 +++++++++-----
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../HashJoinCacheNotFoundException.java         |  45 ++
 .../coprocessor/HashJoinRegionScanner.java      |  11 +-
 .../phoenix/exception/SQLExceptionCode.java     |   3 +-
 .../apache/phoenix/execute/AggregatePlan.java   |  10 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  27 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |  16 +-
 .../execute/LiteralResultIterationPlan.java     |   9 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  11 +-
 .../phoenix/iterate/BaseResultIterators.java    |  51 ++-
 .../DefaultTableResultIteratorFactory.java      |   7 +-
 .../phoenix/iterate/ParallelIterators.java      |  14 +-
 .../apache/phoenix/iterate/SerialIterators.java |  15 +-
 .../phoenix/iterate/TableResultIterator.java    |  56 ++-
 .../iterate/TableResultIteratorFactory.java     |   6 +-
 .../apache/phoenix/join/HashCacheClient.java    |  20 +-
 .../apache/phoenix/query/QueryConstants.java    |   2 +
 .../java/org/apache/phoenix/util/ByteUtil.java  |  15 +
 .../org/apache/phoenix/util/ServerUtil.java     |   7 +
 .../java/org/apache/phoenix/query/BaseTest.java |   1 +
 .../query/ParallelIteratorsSplitTest.java       |   2 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |  28 ++
 28 files changed, 1033 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
index 152bdf0..a823a72 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -28,16 +29,23 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.junit.Before;
+import org.junit.BeforeClass;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
+	
     protected static final String JOIN_SCHEMA = "Join";
     protected static final String JOIN_ORDER_TABLE = "OrderTable";
     protected static final String JOIN_CUSTOMER_TABLE = "CustomerTable";
@@ -442,6 +450,12 @@ public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
         conn.commit();
     }
 
+	protected Connection getConnection() throws SQLException {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
+		return DriverManager.getConnection(getUrl(), props);
+	}
+	
     protected void createIndexes(Connection conn, String virtualName, String realName) throws Exception {
         if (indexDDL != null && indexDDL.length > 0) {
             for (String ddl : indexDDL) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
new file mode 100644
index 0000000..76f45e2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class HashJoinCacheIT extends HashJoinIT {
+    
+    public HashJoinCacheIT(String[] indexDDL, String[] plans) throws Exception {
+        super(indexDDL, plans);
+    }
+    
+    protected String getTableName(Connection conn, String virtualName) throws Exception {
+        String realName = super.getTableName(conn, virtualName);
+        TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName), InvalidateHashCache.class);
+        return realName;
+    }
+        
+    @Parameters
+    public static Collection<Object> data() {
+        List<Object> testCases = Lists.newArrayList();
+        testCases.add(new String[][] {
+            {}, {
+            /* 
+             * testLeftJoinWithAggregation()
+             *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
+             *     LEFT JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     GROUP BY i.name ORDER BY i.name
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+            /* 
+             * testLeftJoinWithAggregation()
+             *     SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o 
+             *     LEFT JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     GROUP BY i.item_id ORDER BY q DESC"
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"I.item_id\"]\n" +
+            "CLIENT MERGE SORT\n" +
+            "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY FIRST KEY ONLY",
+            /* 
+             * testLeftJoinWithAggregation()
+             *     SELECT i.item_id iid, sum(quantity) q FROM joinItemTable i 
+             *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id 
+             *     GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY FIRST KEY ONLY\n" +
+            "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" +
+            "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+            /* 
+             * testRightJoinWithAggregation()
+             *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
+             *     RIGHT JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     GROUP BY i.name ORDER BY i.name
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+            /*
+             * testRightJoinWithAggregation()
+             *     SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o 
+             *     RIGHT JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY FIRST KEY ONLY\n" +
+            "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" +
+            "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+            /*
+             * testJoinWithWildcard()
+             *     SELECT * FROM joinItemTable LEFT JOIN joinSupplierTable supp 
+             *     ON joinItemTable.supplier_id = supp.supplier_id 
+             *     ORDER BY item_id
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + 
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME,
+            /*
+             * testJoinPlanWithIndex()
+             *     SELECT item.item_id, item.name, supp.supplier_id, supp.name 
+             *     FROM joinItemTable item LEFT JOIN joinSupplierTable supp 
+             *     ON substr(item.name, 2, 1) = substr(supp.name, 2, 1) 
+             *         AND (supp.name BETWEEN 'S1' AND 'S5') 
+             *     WHERE item.name BETWEEN 'T1' AND 'T5'
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY (NAME >= 'T1' AND NAME <= 'T5')\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY (NAME >= 'S1' AND NAME <= 'S5')",
+            /*
+             * testJoinPlanWithIndex()
+             *     SELECT item.item_id, item.name, supp.supplier_id, supp.name 
+             *     FROM joinItemTable item INNER JOIN joinSupplierTable supp 
+             *     ON item.supplier_id = supp.supplier_id 
+             *     WHERE (item.name = 'T1' OR item.name = 'T5') 
+             *         AND (supp.name = 'S1' OR supp.name = 'S5')
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY (NAME = 'T1' OR NAME = 'T5')\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY (NAME = 'S1' OR NAME = 'S5')",
+            /*
+             * testJoinWithSkipMergeOptimization()
+             *     SELECT s.name FROM joinItemTable i 
+             *     JOIN joinOrderTable o ON o.item_id = i.item_id AND quantity < 5000 
+             *     JOIN joinSupplierTable s ON i.supplier_id = s.supplier_id
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY QUANTITY < 5000\n" +
+            "    PARALLEL INNER-JOIN TABLE 1\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")",
+            /*
+             * testSelfJoin()
+             *     SELECT i2.item_id, i1.name FROM joinItemTable i1 
+             *     JOIN joinItemTable i2 ON i1.item_id = i2.item_id 
+             *     ORDER BY i1.item_id
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY FIRST KEY ONLY\n" +
+            "    DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.item_id\")",
+            /*
+             * testSelfJoin()
+             *     SELECT i1.name, i2.name FROM joinItemTable i1 
+             *     JOIN joinItemTable i2 ON i1.item_id = i2.supplier_id 
+             *     ORDER BY i1.name, i2.name
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER SORTED BY [I1.NAME, I2.NAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.supplier_id\")",
+            /*
+             * testStarJoin()
+             *     SELECT order_id, c.name, i.name iname, quantity, o.date 
+             *     FROM joinOrderTable o 
+             *     JOIN joinCustomerTable c ON o.customer_id = c.customer_id 
+             *     JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     ORDER BY order_id
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 1\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+            /*
+             * testStarJoin()
+             *     SELECT (*NO_STAR_JOIN*) order_id, c.name, i.name iname, quantity, o.date 
+             *     FROM joinOrderTable o 
+             *     JOIN joinCustomerTable c ON o.customer_id = c.customer_id 
+             *     JOIN joinItemTable i ON o.item_id = i.item_id 
+             *     ORDER BY order_id
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER SORTED BY [\"O.order_id\"]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            PARALLEL INNER-JOIN TABLE 0\n" +
+            "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")",
+            /*
+             * testSubJoin()
+             *     SELECT * FROM joinCustomerTable c 
+             *     INNER JOIN (joinOrderTable o 
+             *         INNER JOIN (joinSupplierTable s 
+             *             RIGHT JOIN joinItemTable i ON i.supplier_id = s.supplier_id)
+             *         ON o.item_id = i.item_id)
+             *     ON c.customer_id = o.customer_id
+             *     WHERE c.customer_id <= '0000000005' 
+             *         AND order_id != '000000000000003' 
+             *         AND i.name != 'T3' 
+             *     ORDER BY c.customer_id, i.name
+             */
+            "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" +
+            "    SERVER SORTED BY [\"C.customer_id\", I.NAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY \"order_id\" != '000000000000003'\n" +
+            "            PARALLEL INNER-JOIN TABLE 0\n" +
+            "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "                    SERVER FILTER BY NAME != 'T3'\n" +
+            "                    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"C.customer_id\" IN (\"O.customer_id\")",
+            /* 
+             * testJoinWithSubqueryAndAggregation()
+             *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
+             *     LEFT JOIN (SELECT name, item_id iid FROM joinItemTable) AS i 
+             *     ON o.item_id = i.iid 
+             *     GROUP BY i.name ORDER BY i.name
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+            /* 
+             * testJoinWithSubqueryAndAggregation()
+             *     SELECT o.iid, sum(o.quantity) q 
+             *     FROM (SELECT item_id iid, quantity FROM joinOrderTable) AS o 
+             *     LEFT JOIN (SELECT item_id FROM joinItemTable) AS i 
+             *     ON o.iid = i.item_id 
+             *     GROUP BY o.iid ORDER BY q DESC                 
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    SERVER AGGREGATE INTO DISTINCT ROWS BY [O.IID]\n" +
+            "CLIENT MERGE SORT\n" +
+            "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0 (SKIP MERGE)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY FIRST KEY ONLY",
+            /* 
+             * testJoinWithSubqueryAndAggregation()
+             *     SELECT i.iid, o.q 
+             *     FROM (SELECT item_id iid FROM joinItemTable) AS i 
+             *     LEFT JOIN (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+             *     ON o.iid = i.iid 
+             *     ORDER BY o.q DESC NULLS LAST, i.iid
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY FIRST KEY ONLY\n" +
+            "    SERVER SORTED BY [O.Q DESC NULLS LAST, I.IID]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" +
+            "        CLIENT MERGE SORT",
+            /* 
+             * testJoinWithSubqueryAndAggregation()
+             *     SELECT i.iid, o.q 
+             *     FROM (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+             *     JOIN (SELECT item_id iid FROM joinItemTable) AS i 
+             *     ON o.iid = i.iid 
+             *     ORDER BY o.q DESC, i.iid
+             */     
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    SERVER FILTER BY FIRST KEY ONLY\n" +
+            "    SERVER SORTED BY [O.Q DESC, I.IID]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" +
+            "        CLIENT MERGE SORT",
+            /*
+             * testNestedSubqueries()
+             *     SELECT * FROM (SELECT customer_id cid, name, phone, address, loc_id, date FROM joinCustomerTable) AS c 
+             *     INNER JOIN (SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price * o.quantity, o.date odate, 
+             *     qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription, 
+             *     qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id 
+             *         FROM (SELECT item_id iid, customer_id cid, order_id oid, price, quantity, date FROM joinOrderTable) AS o 
+             *         INNER JOIN (SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription, 
+             *         s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id 
+             *             FROM (SELECT supplier_id sid, name, phone, address, loc_id FROM joinSupplierTable) AS s 
+             *             RIGHT JOIN (SELECT item_id iid, name, price, discount1, discount2, supplier_id sid, description FROM joinItemTable) AS i 
+             *             ON i.sid = s.sid) as qi 
+             *         ON o.iid = qi.iiid) as qo 
+             *     ON c.cid = qo.ocid 
+             *     WHERE c.cid <= '0000000005' 
+             *         AND qo.ooid != '000000000000003' 
+             *         AND qo.iname != 'T3' 
+             *     ORDER BY c.cid, qo.iname
+             */
+            "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" +
+            "    SERVER SORTED BY [C.CID, QO.INAME]\n" +
+            "CLIENT MERGE SORT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "            SERVER FILTER BY \"order_id\" != '000000000000003'\n" +
+            "            PARALLEL INNER-JOIN TABLE 0\n" +
+            "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "                    SERVER FILTER BY NAME != 'T3'\n" +
+            "                    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME,
+            /*
+             * testJoinWithLimit()
+             *     SELECT order_id, i.name, s.name, s.address, quantity 
+             *     FROM joinSupplierTable s 
+             *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+             *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+             */
+            "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "    SERVER 4 ROW LIMIT\n" +
+            "CLIENT 4 ROW LIMIT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    JOIN-SCANNER 4 ROW LIMIT",
+            /*
+             * testJoinWithLimit()
+             *     SELECT order_id, i.name, s.name, s.address, quantity 
+             *     FROM joinSupplierTable s 
+             *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+             *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "CLIENT 4 ROW LIMIT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" +
+            "    JOIN-SCANNER 4 ROW LIMIT",
+            /*
+             * testJoinWithSetMaxRows()
+             *     statement.setMaxRows(4);
+             *     SELECT order_id, i.name, quantity FROM joinItemTable i
+             *     JOIN joinOrderTable o ON o.item_id = i.item_id;
+             *     SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+             *     JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+             *     ON o.item_id = i.item_id;
+             */
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "CLIENT 4 ROW LIMIT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")\n" +
+            "    JOIN-SCANNER 4 ROW LIMIT",
+            /*
+             * testJoinWithOffset()
+             *     SELECT order_id, i.name, s.name, s.address, quantity 
+             *     FROM joinSupplierTable s 
+             *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+             *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+             */
+            "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "    SERVER OFFSET 2\n" +
+            "    SERVER 3 ROW LIMIT\n" +
+            "CLIENT 1 ROW LIMIT\n" +
+            "    PARALLEL LEFT-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    JOIN-SCANNER 3 ROW LIMIT",
+            /*
+             * testJoinWithOffset()
+             *     SELECT order_id, i.name, s.name, s.address, quantity 
+             *     FROM joinSupplierTable s 
+             *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+             *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+             */
+            "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+            "    SERVER OFFSET 2\n" +
+            "CLIENT 1 ROW LIMIT\n" +
+            "    PARALLEL INNER-JOIN TABLE 0\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+            "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+            "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+            "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" +
+            "    JOIN-SCANNER 3 ROW LIMIT",
+            }});
+        return testCases;
+    }
+    
+    @Test
+    public void testInnerJoin() throws Exception {
+		// it involves sequences which may be incremented on re-try when hash
+		// cache is removed so this test may flap sometimes, so we don't need to
+		// test it for this case.	
+    }
+
+	@Test
+	public void testUpsertWithJoin() throws Exception {
+		// TODO: We will enable this test once PHOENIX-3163
+	}
+    
+    public static class InvalidateHashCache extends SimpleRegionObserver {
+        public static Random rand= new Random();
+        public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
+        @Override
+        public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+                final RegionScanner s) throws IOException {
+            final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan);
+            if (joinInfo != null) {
+                TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null);
+                int count = joinInfo.getJoinIds().length;
+                for (int i = 0; i < count; i++) {
+                    ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
+                    if (!ByteUtil.contains(lastRemovedJoinIds,joinId)) {
+                        lastRemovedJoinIds.add(joinId);
+                        cache.removeServerCache(joinId);
+                    }
+                }
+            }
+            return s;
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 7bdea5f..edab319 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
@@ -34,23 +35,44 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 @RunWith(Parameterized.class)
 public class HashJoinIT extends BaseJoinIT {
+    
+    
     public HashJoinIT(String[] indexDDL, String[] plans) {
         super(indexDDL, plans);
     }
@@ -1166,8 +1188,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testDefaultJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"";
@@ -1213,8 +1234,7 @@ public class HashJoinIT extends BaseJoinIT {
 
     @Test
     public void testInnerJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for " + seqName + " FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"";
@@ -1266,8 +1286,7 @@ public class HashJoinIT extends BaseJoinIT {
             
     @Test
     public void testLeftJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String query[] = new String[3];
@@ -1325,8 +1344,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testRightJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
@@ -1377,8 +1395,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testInnerJoinWithPreFilters() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'";
@@ -1441,8 +1458,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testLeftJoinWithPreFilters() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item LEFT JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
@@ -1493,8 +1509,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithPostFilters() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'";
@@ -1557,8 +1572,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testStarJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME);
         String tableName3 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
@@ -1631,8 +1645,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testLeftJoinWithAggregation() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o LEFT JOIN " 
@@ -1717,8 +1730,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testRightJoinWithAggregation() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o RIGHT JOIN " 
@@ -1790,8 +1802,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testLeftRightJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
         String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
         String tableName3 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
@@ -1896,8 +1907,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testMultiLeftJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String[] queries = {
                 "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN "
                         + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
@@ -1949,8 +1959,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testMultiRightJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN "
             + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
             + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
@@ -2025,6 +2034,7 @@ public class HashJoinIT extends BaseJoinIT {
     public void testMultiRightJoin_SmallChunkSize() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
+        props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN "
                 + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
@@ -2096,8 +2106,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithWildcard() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT * FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
         try {
             PreparedStatement statement = conn.prepareStatement(query);
@@ -2205,8 +2214,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithTableWildcard() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT s.*, "+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".*, \"order_id\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN " 
                 + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
                 + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
@@ -2351,8 +2359,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinMultiJoinKeys() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT c.name, s.name FROM " + getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON \"customer_id\" = \"supplier_id\" AND c.loc_id = s.loc_id AND substr(s.name, 2, 1) = substr(c.name, 2, 1)";
         try {
             PreparedStatement statement = conn.prepareStatement(query);
@@ -2384,8 +2391,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithDifferentNumericJoinKeyTypes() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN " 
             + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price = (i.price * (100 - discount2)) / 100.0 WHERE quantity < 5000";
         try {
@@ -2406,8 +2412,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithDifferentDateJoinKeyTypes() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT \"order_id\", c.name, o.\"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN "
             + getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c ON o.\"customer_id\" = c.\"customer_id\" AND o.\"DATE\" = c.\"DATE\"";
         try {
@@ -2438,8 +2443,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithIncomparableJoinKeyTypes() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN " 
             + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price / 100 = substr(i.name, 2, 1)";
         try {
@@ -2455,8 +2459,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinPlanWithIndex() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON substr(item.name, 2, 1) = substr(supp.name, 2, 1) AND (supp.name BETWEEN 'S1' AND 'S5') WHERE item.name BETWEEN 'T1' AND 'T5'";
         String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE (item.name = 'T1' OR item.name = 'T5') AND (supp.name = 'S1' OR supp.name = 'S5')";
         try {
@@ -2518,8 +2521,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithSkipMergeOptimization() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query = "SELECT s.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN " 
             + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" AND quantity < 5000 JOIN "
             + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\"";
@@ -2546,8 +2548,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testSelfJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT i2.\"item_id\", i1.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN " 
             + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i2 ON i1.\"item_id\" = i2.\"item_id\" ORDER BY i1.\"item_id\"";
         String query2 = "SELECT i1.name, i2.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN " 
@@ -2615,8 +2616,7 @@ public class HashJoinIT extends BaseJoinIT {
     @Test
     public void testUpsertWithJoin() throws Exception {
         String tempTable = generateUniqueName();
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         conn.setAutoCommit(true);
         try {
             conn.createStatement().execute("CREATE TABLE " + tempTable 
@@ -2768,8 +2768,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testSubJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT i.name, count(c.name), min(s.name), max(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN " 
                 + "(" + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\")" 
                 + " ON o.\"item_id\" = i.\"item_id\" LEFT JOIN " 
@@ -2894,8 +2893,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithSubquery() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT item.\"item_id\", item.name, supp.sid, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " WHERE name BETWEEN 'S1' AND 'S5') AS supp ON item.\"supplier_id\" = supp.sid";
         String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name = 'S1' OR supp.name = 'S5')";
         try {
@@ -2956,8 +2954,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithSubqueryPostFilters() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         try {
             String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " LIMIT 5) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
             PreparedStatement statement = conn.prepareStatement(query);
@@ -3006,8 +3003,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithSubqueryAndAggregation() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT i.name, sum(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM " 
             + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name";
         String query2 = "SELECT o.iid, sum(o.quantity) q FROM (SELECT \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN (SELECT \"item_id\" FROM " 
@@ -3112,8 +3108,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testNestedSubqueries() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT q.iname, count(c.name), min(q.sname), max(o.quantity) FROM (SELECT \"customer_id\" cid, \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN " 
                 + "(SELECT i.iid iid, s.name sname, i.name iname FROM (SELECT \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS s RIGHT JOIN (SELECT \"item_id\" iid, name, \"supplier_id\" sid FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON i.sid = s.sid) AS q" 
                 + " ON o.iid = q.iid LEFT JOIN (SELECT \"customer_id\" cid, name FROM " 
@@ -3235,8 +3230,7 @@ public class HashJoinIT extends BaseJoinIT {
     
     @Test
     public void testJoinWithLimit() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s LEFT JOIN " 
                 + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
                 + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 4";
@@ -3314,8 +3308,7 @@ public class HashJoinIT extends BaseJoinIT {
 
     @Test
     public void testJoinWithOffset() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME)
                 + " s LEFT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
                 + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 ";
@@ -3357,8 +3350,7 @@ public class HashJoinIT extends BaseJoinIT {
 
     @Test
     public void testNonEquiJoin() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         try {
             String query = "SELECT item.name, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item, " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp WHERE item.\"supplier_id\" > supp.\"supplier_id\"";
             PreparedStatement statement = conn.prepareStatement(query);
@@ -3414,8 +3406,7 @@ public class HashJoinIT extends BaseJoinIT {
 
     @Test
     public void testJoinWithSetMaxRows() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = getConnection();
         String [] queries = new String[2];
         queries[0] = "SELECT \"order_id\", i.name, quantity FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN "
                 + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\"";
@@ -3453,4 +3444,5 @@ public class HashJoinIT extends BaseJoinIT {
             conn.close();
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 067f50f..12630f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -690,7 +690,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
                 conn.commit();
                 doneSignal.await(30, TimeUnit.SECONDS);
                 // Install coprocessor that will simulate an index write failure during index rebuild
-                addWriteFailingCoprocessor(conn,fullIndexName);
+                TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
                 clock.time += WAIT_AFTER_DISABLED;
                 doneSignal.await(30, TimeUnit.SECONDS);
                 WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
@@ -843,27 +843,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         t.start();
     }
     
-    private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
-        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
-        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
-        descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
-        int numTries = 10;
-        try (HBaseAdmin admin = services.getAdmin()) {
-            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
-            while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
-                    && numTries > 0) {
-                numTries--;
-                if (numTries == 0) {
-                    throw new Exception(
-                            "Check to detect if delaying co-processor was added failed after "
-                                    + numTries + " retries.");
-                }
-                Thread.sleep(1000);
-            }
-        }
-    }
-    
     private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
         ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
         HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 5e13982..23bfebd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -18,14 +18,18 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -41,16 +45,16 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
     @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
-                renewLeaseThreshold, plan, scanGrouper);
+                renewLeaseThreshold, plan, scanGrouper, caches);
     }
 
     private class DelayedTableResultIterator extends TableResultIterator {
         public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
                 ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
-                ParallelScanGrouper scanGrouper) throws SQLException {
-            super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper);
+                ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+            super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 18e4034..ce46a3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -21,6 +21,9 @@ import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INS
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -56,9 +59,10 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
-import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.join.HashCacheFactory;
+import org.apache.phoenix.memory.InsufficientMemoryException;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -67,6 +71,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
@@ -86,6 +91,7 @@ public class ServerCacheClient {
     public static final byte[] KEY_IN_FIRST_REGION = new byte[]{0};
     private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
     private static final Random RANDOM = new Random();
+	public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server";
     private final PhoenixConnection connection;
     private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer, TableRef>();
 
@@ -115,12 +121,41 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final ImmutableSet<HRegionLocation> servers;
+        private final Set<HRegionLocation> servers;
+        private ImmutableBytesWritable cachePtr;
+        private MemoryChunk chunk;
+        private File outputFile;
         
-        public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+        
+        public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
+                ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
             this.id = id;
-            this.servers = ImmutableSet.copyOf(servers);
-            this.size = size;
+            this.servers = new HashSet<HRegionLocation>(servers);
+            this.size =  cachePtr.getLength();
+            if (storeCacheOnClient) {
+                try {
+                    this.chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+                    this.cachePtr = cachePtr;
+                } catch (InsufficientMemoryException e) {
+                    this.outputFile = File.createTempFile("HashJoinCacheSpooler", ".bin", new File(services.getProps()
+                            .get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)));
+                    try (FileOutputStream fio = new FileOutputStream(outputFile)) {
+                        fio.write(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength());
+                    }
+                }
+            }
+            
+        }
+
+        public ImmutableBytesWritable getCachePtr() throws IOException {
+            if(this.outputFile!=null){
+                try (FileInputStream fio = new FileInputStream(outputFile)) {
+                    byte[] b = new byte[this.size];
+                    fio.read(b);
+                    cachePtr = new ImmutableBytesWritable(b);
+                }
+            }
+            return cachePtr;
         }
 
         /**
@@ -136,22 +171,41 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-
+        
+		public boolean addServer(HRegionLocation loc) {
+			return this.servers.add(loc);
+		}
+        
         /**
          * Call to free up cache on region servers when no longer needed
          */
         @Override
         public void close() throws SQLException {
-            removeServerCache(id, servers);
+            try{
+                removeServerCache(this, servers);
+            }finally{
+                cachePtr = null;
+                if (chunk != null) {
+                    chunk.close();
+                }
+                if (outputFile != null) {
+                    outputFile.delete();
+                }
+            }
         }
-
+        
+    }
+    
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+            final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef, false);
     }
     
-    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+            final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean storeCacheOnClient)
+            throws SQLException {
         ConnectionQueryServices services = connection.getQueryServices();
-        MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
         List<Closeable> closeables = new ArrayList<Closeable>();
-        closeables.add(chunk);
         ServerCache hashCacheSpec = null;
         SQLException firstException = null;
         final byte[] cacheId = generateId();
@@ -187,54 +241,7 @@ public class ServerCacheClient {
                         
                         @Override
                         public Boolean call() throws Exception {
-                            final Map<byte[], AddServerCacheResponse> results;
-                            try {
-                                results = htable.coprocessorService(ServerCachingService.class, key, key, 
-                                            new Batch.Call<ServerCachingService, AddServerCacheResponse>() {
-                                                @Override
-                                                public AddServerCacheResponse call(ServerCachingService instance) throws IOException {
-                                                    ServerRpcController controller = new ServerRpcController();
-                                                    BlockingRpcCallback<AddServerCacheResponse> rpcCallback =
-                                                            new BlockingRpcCallback<AddServerCacheResponse>();
-                                                    AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
-                                                    final byte[] tenantIdBytes;
-                                                    if(cacheUsingTable.isMultiTenant()) {
-                                                        try {
-                                                            tenantIdBytes = connection.getTenantId() == null ? null :
-                                                                    ScanUtil.getTenantIdBytes(
-                                                                            cacheUsingTable.getRowKeySchema(),
-                                                                            cacheUsingTable.getBucketNum() != null,
-                                                                            connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
-                                                        } catch (SQLException e) {
-                                                            throw new IOException(e);
-                                                        }
-                                                    } else {
-                                                        tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
-                                                    }
-                                                    if (tenantIdBytes != null) {
-                                                        builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
-                                                    }
-                                                    builder.setCacheId(ByteStringer.wrap(cacheId));
-                                                    builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
-                                                    builder.setHasProtoBufIndexMaintainer(true);
-                                                    ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
-                                                    svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
-                                                    builder.setCacheFactory(svrCacheFactoryBuider.build());
-                                                    builder.setTxState(ByteStringer.wrap(txState));
-                                                    instance.addServerCache(controller, builder.build(), rpcCallback);
-                                                    if(controller.getFailedOn() != null) {
-                                                        throw controller.getFailedOn();
-                                                    }
-                                                    return rpcCallback.get(); 
-                                                }
-                                              });
-                            } catch (Throwable t) {
-                                throw new Exception(t);
-                            }
-                            if(results != null && results.size() == 1){
-                                return results.values().iterator().next().getReturn();
-                            }
-                            return false;
+                            return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState);
                         }
 
                         /**
@@ -257,7 +264,7 @@ public class ServerCacheClient {
                 }
             }
             
-            hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+            hashCacheSpec = new ServerCache(cacheId,servers,cachePtr, services, storeCacheOnClient);
             // Execute in parallel
             int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
             for (Future<Boolean> future : futures) {
@@ -303,73 +310,81 @@ public class ServerCacheClient {
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
      */
-    private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
-    	ConnectionQueryServices services = connection.getQueryServices();
-    	Throwable lastThrowable = null;
-    	TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
-    	final PTable cacheUsingTable = cacheUsingTableRef.getTable();
-    	byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
-    	HTableInterface iterateOverTable = services.getTable(tableName);
-    	try {
-    		List<HRegionLocation> locations = services.getAllTableRegions(tableName);
-    		Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
-    		/**
-    		 * Allow for the possibility that the region we based where to send our cache has split and been
-    		 * relocated to another region server *after* we sent it, but before we removed it. To accommodate
-    		 * this, we iterate through the current metadata boundaries and remove the cache once for each
-    		 * server that we originally sent to.
-    		 */
-    		if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));}
-    		for (HRegionLocation entry : locations) {
-    			if (remainingOnServers.contains(entry)) {  // Call once per server
-    				try {
+    private void removeServerCache(final ServerCache cache, Set<HRegionLocation> remainingOnServers) throws SQLException {
+        HTableInterface iterateOverTable = null;
+        final byte[] cacheId = cache.getId();
+        try {
+            ConnectionQueryServices services = connection.getQueryServices();
+            Throwable lastThrowable = null;
+            TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+            final PTable cacheUsingTable = cacheUsingTableRef.getTable();
+            byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+            iterateOverTable = services.getTable(tableName);
+
+            List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+            /**
+             * Allow for the possibility that the region we based where to send our cache has split and been relocated
+             * to another region server *after* we sent it, but before we removed it. To accommodate this, we iterate
+             * through the current metadata boundaries and remove the cache once for each server that we originally sent
+             * to.
+             */
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));
+            }
+            for (HRegionLocation entry : locations) {
+             // Call once per server
+                if (remainingOnServers.contains(entry)) { 
+                    try {
                         byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey());
-    					iterateOverTable.coprocessorService(ServerCachingService.class, key, key, 
-    							new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
-    						@Override
-    						public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException {
-    							ServerRpcController controller = new ServerRpcController();
-    							BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback =
-    									new BlockingRpcCallback<RemoveServerCacheResponse>();
-    							RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder();
-                                final byte[] tenantIdBytes;
-                                if(cacheUsingTable.isMultiTenant()) {
-                                    try {
-                                        tenantIdBytes = connection.getTenantId() == null ? null :
-                                                ScanUtil.getTenantIdBytes(
-                                                        cacheUsingTable.getRowKeySchema(),
-                                                        cacheUsingTable.getBucketNum() != null,
-                                                        connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
-                                    } catch (SQLException e) {
-                                        throw new IOException(e);
+                        iterateOverTable.coprocessorService(ServerCachingService.class, key, key,
+                                new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
+                                    @Override
+                                    public RemoveServerCacheResponse call(ServerCachingService instance)
+                                            throws IOException {
+                                        ServerRpcController controller = new ServerRpcController();
+                                        BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = new BlockingRpcCallback<RemoveServerCacheResponse>();
+                                        RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest
+                                                .newBuilder();
+                                        final byte[] tenantIdBytes;
+                                        if (cacheUsingTable.isMultiTenant()) {
+                                            try {
+                                                tenantIdBytes = connection.getTenantId() == null ? null
+                                                        : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
+                                                                cacheUsingTable.getBucketNum() != null,
+                                                                connection.getTenantId(),
+                                                                cacheUsingTable.getViewIndexId() != null);
+                                            } catch (SQLException e) {
+                                                throw new IOException(e);
+                                            }
+                                        } else {
+                                            tenantIdBytes = connection.getTenantId() == null ? null
+                                                    : connection.getTenantId().getBytes();
+                                        }
+                                        if (tenantIdBytes != null) {
+                                            builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+                                        }
+                                        builder.setCacheId(ByteStringer.wrap(cacheId));
+                                        instance.removeServerCache(controller, builder.build(), rpcCallback);
+                                        if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                                        return rpcCallback.get();
                                     }
-                                } else {
-                                    tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
-                                }
-                                if (tenantIdBytes != null) {
-                                    builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
-                                }
-                                builder.setCacheId(ByteStringer.wrap(cacheId));
-    							instance.removeServerCache(controller, builder.build(), rpcCallback);
-    							if(controller.getFailedOn() != null) {
-    								throw controller.getFailedOn();
-    							}
-    							return rpcCallback.get(); 
-    						}
-    					});
-    					remainingOnServers.remove(entry);
-    				} catch (Throwable t) {
-    					lastThrowable = t;
-    					LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), t);
-    				}
-    			}
-    		}
-    		if (!remainingOnServers.isEmpty()) {
-    			LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), lastThrowable);
-    		}
-    	} finally {
-    		Closeables.closeQuietly(iterateOverTable);
-    	}
+                                });
+                        remainingOnServers.remove(entry);
+                    } catch (Throwable t) {
+                        lastThrowable = t;
+                        LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection),
+                                t);
+                    }
+                }
+            }
+            if (!remainingOnServers.isEmpty()) {
+                LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection),
+                        lastThrowable);
+            }
+        } finally {
+            cacheUsingTableRefMap.remove(cacheId);
+            Closeables.closeQuietly(iterateOverTable);
+        }
     }
 
     /**
@@ -394,4 +409,77 @@ public class ServerCacheClient {
         }
         return regionStartKey;
     }
+
+    public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCacheFactory cacheFactory,
+             byte[] txState, PTable pTable) throws Exception {
+        HTableInterface table = null;
+        boolean success = true;
+        byte[] cacheId = cache.getId();
+        try {
+            ConnectionQueryServices services = connection.getQueryServices();
+            
+            byte[] tableName = pTable.getPhysicalName().getBytes();
+            table = services.getTable(tableName);
+            HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
+				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
+						txState);
+			}
+			return success;
+        } finally {
+            Closeables.closeQuietly(table);
+        }
+    }
+    
+    public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
+            final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState)
+            throws Exception {
+        byte[] keyInRegion = getKeyInRegion(key);
+        final Map<byte[], AddServerCacheResponse> results;
+        try {
+            results = htable.coprocessorService(ServerCachingService.class, keyInRegion, keyInRegion,
+                    new Batch.Call<ServerCachingService, AddServerCacheResponse>() {
+                        @Override
+                        public AddServerCacheResponse call(ServerCachingService instance) throws IOException {
+                            ServerRpcController controller = new ServerRpcController();
+                            BlockingRpcCallback<AddServerCacheResponse> rpcCallback = new BlockingRpcCallback<AddServerCacheResponse>();
+                            AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
+                            final byte[] tenantIdBytes;
+                            if (cacheUsingTable.isMultiTenant()) {
+                                try {
+                                    tenantIdBytes = connection.getTenantId() == null ? null
+                                            : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
+                                                    cacheUsingTable.getBucketNum() != null, connection.getTenantId(),
+                                                    cacheUsingTable.getViewIndexId() != null);
+                                } catch (SQLException e) {
+                                    throw new IOException(e);
+                                }
+                            } else {
+                                tenantIdBytes = connection.getTenantId() == null ? null
+                                        : connection.getTenantId().getBytes();
+                            }
+                            if (tenantIdBytes != null) {
+                                builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+                            }
+                            builder.setCacheId(ByteStringer.wrap(cacheId));
+                            builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
+                            builder.setHasProtoBufIndexMaintainer(true);
+                            ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
+                                    .newBuilder();
+                            svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
+                            builder.setCacheFactory(svrCacheFactoryBuider.build());
+                            builder.setTxState(ByteStringer.wrap(txState));
+                            instance.addServerCache(controller, builder.build(), rpcCallback);
+                            if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                            return rpcCallback.get();
+                        }
+                    });
+        } catch (Throwable t) {
+            throw new Exception(t);
+        }
+        if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
+        return false;
+
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 34361ac..83cc24e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;