You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2018/09/11 01:56:25 UTC

[8/8] phoenix git commit: PHOENIX-4666 Persistent subquery cache for hash joins

PHOENIX-4666 Persistent subquery cache for hash joins

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4e0405b7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4e0405b7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4e0405b7

Branch: refs/heads/4.x-HBase-1.2
Commit: 4e0405b75f39f7ae45aa4f6da417b4be3691efef
Parents: ade7d28
Author: Marcell Ortutay <ma...@gmail.com>
Authored: Thu Mar 29 12:59:03 2018 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Mon Sep 10 21:36:01 2018 -0400

----------------------------------------------------------------------
 .../end2end/join/HashJoinPersistentCacheIT.java | 167 +++++++++++++++
 .../org/apache/phoenix/cache/GlobalCache.java   |  22 +-
 .../apache/phoenix/cache/ServerCacheClient.java |  59 ++++--
 .../org/apache/phoenix/cache/TenantCache.java   |   2 +-
 .../apache/phoenix/cache/TenantCacheImpl.java   | 209 ++++++++++++++++---
 .../apache/phoenix/compile/QueryCompiler.java   |   9 +-
 .../phoenix/compile/StatementContext.java       |  21 +-
 .../coprocessor/HashJoinRegionScanner.java      |   4 +-
 .../coprocessor/ServerCachingEndpointImpl.java  |   2 +-
 .../generated/ServerCachingProtos.java          | 117 +++++++++--
 .../apache/phoenix/execute/HashJoinPlan.java    | 104 +++++++--
 .../phoenix/iterate/BaseResultIterators.java    |   8 +-
 .../phoenix/iterate/TableResultIterator.java    |   6 +-
 .../apache/phoenix/join/HashCacheClient.java    |  24 ++-
 .../apache/phoenix/join/HashCacheFactory.java   |  11 +
 .../java/org/apache/phoenix/parse/HintNode.java |   4 +
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../apache/phoenix/cache/TenantCacheTest.java   | 112 ++++++++--
 .../src/main/ServerCachingService.proto         |   1 +
 phoenix-protocol/src/main/build-proto.sh        |   6 +
 21 files changed, 773 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
new file mode 100644
index 0000000..2f072b8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.join;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.join.HashJoinCacheIT.InvalidateHashCache;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class HashJoinPersistentCacheIT extends BaseJoinIT {
+
+    @Override
+    protected String getTableName(Connection conn, String virtualName) throws Exception {
+        String realName = super.getTableName(conn, virtualName);
+        TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName),
+                InvalidateHashCache.class);
+        return realName;
+    }
+
+    @Test
+    public void testPersistentCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        createTestTable(getUrl(),
+                "CREATE TABLE IF NOT EXISTS states (state CHAR(2) " +
+                "NOT NULL, name VARCHAR NOT NULL CONSTRAINT my_pk PRIMARY KEY (state, name))");
+        createTestTable(getUrl(),
+                "CREATE TABLE IF NOT EXISTS cities (state CHAR(2) " +
+                 "NOT NULL, city VARCHAR NOT NULL, population BIGINT " +
+                  "CONSTRAINT my_pk PRIMARY KEY (state, city))");
+
+        conn.prepareStatement(
+                "UPSERT INTO states VALUES ('CA', 'California')").executeUpdate();
+        conn.prepareStatement(
+                "UPSERT INTO states VALUES ('AZ', 'Arizona')").executeUpdate();
+        conn.prepareStatement(
+                "UPSERT INTO cities VALUES ('CA', 'San Francisco', 50000)").executeUpdate();
+        conn.prepareStatement(
+                "UPSERT INTO cities VALUES ('CA', 'Sacramento', 3000)").executeUpdate();
+        conn.prepareStatement(
+                "UPSERT INTO cities VALUES ('AZ', 'Phoenix', 20000)").executeUpdate();
+        conn.commit();
+
+        /* First, run query without using the persistent cache. This should return
+        * different results after an UPSERT takes place.
+        */
+        ResultSet rs = conn.prepareStatement(
+                "SELECT SUM(population) FROM states s "
+                +"JOIN cities c ON c.state = s.state").executeQuery();
+        rs.next();
+        int population1 = rs.getInt(1);
+
+        conn.prepareStatement("UPSERT INTO cities VALUES ('CA', 'Mt View', 1500)").executeUpdate();
+        conn.commit();
+        rs = conn.prepareStatement(
+                "SELECT SUM(population) FROM states s " +
+                 "JOIN cities c ON c.state = s.state").executeQuery();
+        rs.next();
+        int population2 = rs.getInt(1);
+
+        assertEquals(73000, population1);
+        assertEquals(74500, population2);
+
+        /* Second, run query using the persistent cache. This should return the
+        * same results after an UPSERT takes place.
+        */
+        rs = conn.prepareStatement(
+                "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) FROM states s " +
+                 "JOIN cities c ON c.state = s.state").executeQuery();
+        rs.next();
+        int population3 = rs.getInt(1);
+
+        conn.prepareStatement(
+                "UPSERT INTO cities VALUES ('CA', 'Palo Alto', 2000)").executeUpdate();
+        conn.commit();
+
+        rs = conn.prepareStatement(
+                "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
+                 "FROM states s JOIN cities c ON c.state = s.state").executeQuery();
+        rs.next();
+        int population4 = rs.getInt(1);
+        rs = conn.prepareStatement(
+                "SELECT SUM(population) FROM states s JOIN cities c ON c.state = s.state")
+                .executeQuery();
+        rs.next();
+        int population5 = rs.getInt(1);
+
+        assertEquals(74500, population3);
+        assertEquals(74500, population4);
+        assertEquals(76500, population5);
+
+        /* Let's make sure caches can be used across queries. We'll set up a
+        * cache, and make sure it is used on two different queries with the
+        * same subquery.
+        */
+
+        String sumQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
+                "FROM cities c JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
+                "ON sq.state = c.state";
+        String distinctQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ " +
+                "COUNT(DISTINCT(c.city)) FROM cities c " +
+                "JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
+                "ON sq.state = c.state";
+        String sumQueryUncached = sumQueryCached.replace(
+                "/*+ USE_PERSISTENT_CACHE */", "");
+        String distinctQueryUncached = distinctQueryCached.replace(
+                "/*+ USE_PERSISTENT_CACHE */", "");
+
+        rs = conn.prepareStatement(sumQueryCached).executeQuery();
+        rs.next();
+        int population6 = rs.getInt(1);
+        rs = conn.prepareStatement(distinctQueryCached).executeQuery();
+        rs.next();
+        int distinct1 = rs.getInt(1);
+        assertEquals(4, distinct1);
+
+        // Add a new city that matches the queries. This should not affect results
+        // using persistent caching.
+        conn.prepareStatement("UPSERT INTO states VALUES ('CO', 'Colorado')").executeUpdate();
+        conn.prepareStatement("UPSERT INTO cities VALUES ('CO', 'Denver', 6000)").executeUpdate();
+        conn.commit();
+
+        rs = conn.prepareStatement(sumQueryCached).executeQuery();
+        rs.next();
+        int population7 = rs.getInt(1);
+        assertEquals(population6, population7);
+        rs = conn.prepareStatement(distinctQueryCached).executeQuery();
+        rs.next();
+        int distinct2 = rs.getInt(1);
+        assertEquals(distinct1, distinct2);
+
+        // Finally, make sure uncached queries give up to date results
+        rs = conn.prepareStatement(sumQueryUncached).executeQuery();
+        rs.next();
+        int population8 = rs.getInt(1);
+        assertEquals(population8, 62500);
+        rs = conn.prepareStatement(distinctQueryUncached).executeQuery();
+        rs.next();
+        int distinct3 = rs.getInt(1);
+        assertEquals(5, distinct3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index ae77174..5f3e29b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -148,7 +148,12 @@ public class GlobalCache extends TenantCacheImpl {
     
     private GlobalCache(Configuration config) {
         super(new GlobalMemoryManager(getMaxMemorySize(config)),
-              config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+              config.getInt(
+                      QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                      QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS),
+              config.getInt(
+                      QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
+                      QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS));
         this.config = config;
     }
     
@@ -164,9 +169,18 @@ public class GlobalCache extends TenantCacheImpl {
     public TenantCache getChildTenantCache(ImmutableBytesPtr tenantId) {
         TenantCache tenantCache = perTenantCacheMap.get(tenantId);
         if (tenantCache == null) {
-            int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
-            int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
-            TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+            int maxTenantMemoryPerc = config.getInt(
+                    MAX_TENANT_MEMORY_PERC_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+            int maxServerCacheTimeToLive = config.getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            int maxServerCachePersistenceTimeToLive = config.getInt(
+                    QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS);
+            TenantCacheImpl newTenantCache = new TenantCacheImpl(
+                    new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc),
+                    maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive);
             tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
             if (tenantCache == null) {
                 tenantCache = newTenantCache;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 5e284bd..011a6f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
@@ -59,6 +61,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheFactory;
@@ -75,6 +78,8 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 
+import com.google.protobuf.ByteString;
+
 /**
  * 
  * Client for sending cache to each region server
@@ -215,22 +220,46 @@ public class ServerCacheClient {
                 }
             }
         }
-        
     }
-    
-    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
-            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException {
-        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
+
+    public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate)
+            throws SQLException, IOException {
+        PTable cacheUsingTable = delegate.getTableRef().getTable();
+        ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices();
+        List<HRegionLocation> locations = services.getAllTableRegions(
+                cacheUsingTable.getPhysicalName().getBytes());
+        int nRegions = locations.size();
+        Set<HRegionLocation> servers = new HashSet<>(nRegions);
+        cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
+        return new ServerCache(cacheId, servers, new ImmutableBytesWritable(
+                new byte[]{}), services, false);
     }
-    
-    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
-            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient)
+
+    public ServerCache addServerCache(
+            ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable)
             throws SQLException {
+        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
+    }
+
+    public ServerCache addServerCache(
+            ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable,
+            boolean storeCacheOnClient) throws SQLException {
+        final byte[] cacheId = ServerCacheClient.generateId();
+        return addServerCache(keyRanges, cacheId, cachePtr, txState, cacheFactory,
+                cacheUsingTable, false, storeCacheOnClient);
+    }
+
+    public ServerCache addServerCache(
+            ScanRanges keyRanges, final byte[] cacheId, final ImmutableBytesWritable cachePtr,
+            final byte[] txState, final ServerCacheFactory cacheFactory,
+            final PTable cacheUsingTable, final boolean usePersistentCache,
+            boolean storeCacheOnClient) throws SQLException {
         ConnectionQueryServices services = connection.getQueryServices();
         List<Closeable> closeables = new ArrayList<Closeable>();
         ServerCache hashCacheSpec = null;
         SQLException firstException = null;
-        final byte[] cacheId = generateId();
         /**
          * Execute EndPoint in parallel on each server to send compressed hash cache 
          */
@@ -251,7 +280,7 @@ public class ServerCacheClient {
                 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
                 if ( ! servers.contains(entry) && 
                         keyRanges.intersectRegion(regionStartKey, regionEndKey,
-                                cacheUsingTable.getIndexType() == IndexType.LOCAL)) {  
+                                cacheUsingTable.getIndexType() == IndexType.LOCAL)) {
                     // Call RPC once per server
                     servers.add(entry);
                     if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));}
@@ -262,7 +291,7 @@ public class ServerCacheClient {
                         
                         @Override
                         public Boolean call() throws Exception {
-                            return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState);
+                            return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState, usePersistentCache);
                         }
 
                         /**
@@ -291,7 +320,7 @@ public class ServerCacheClient {
             for (Future<Boolean> future : futures) {
                 future.get(timeoutMs, TimeUnit.MILLISECONDS);
             }
-            
+
             cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
             success = true;
         } catch (SQLException e) {
@@ -444,7 +473,7 @@ public class ServerCacheClient {
             }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
-						txState);
+						txState, false);
 			}
 			return success;
         } finally {
@@ -453,7 +482,7 @@ public class ServerCacheClient {
     }
     
     public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
-            final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState)
+            final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState, final boolean usePersistentCache)
             throws Exception {
         byte[] keyInRegion = getKeyInRegion(key);
         final Map<byte[], AddServerCacheResponse> results;
@@ -483,6 +512,7 @@ public class ServerCacheClient {
                                 builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
                             }
                             builder.setCacheId(ByteStringer.wrap(cacheId));
+                            builder.setUsePersistentCache(usePersistentCache);
                             builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
                             builder.setHasProtoBufIndexMaintainer(true);
                             ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
@@ -501,7 +531,6 @@ public class ServerCacheClient {
         }
         if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
         return false;
-
     }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index c4e82c2..e37d4d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager;
 public interface TenantCache {
     MemoryManager getMemoryManager();
     Closeable getServerCache(ImmutableBytesPtr cacheId);
-    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException;
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException;
     void removeServerCache(ImmutableBytesPtr cacheId);
     void removeAllServerCache();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 1dc59bc..dc4c9e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -18,16 +18,22 @@
 package org.apache.phoenix.cache;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.Closeables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
@@ -44,18 +50,86 @@ import com.google.common.cache.RemovalNotification;
  * @since 0.1
  */
 public class TenantCacheImpl implements TenantCache {
+    private static final Logger logger = LoggerFactory.getLogger(TenantCacheImpl.class);
     private final int maxTimeToLiveMs;
+    private final int maxPersistenceTimeToLiveMs;
     private final MemoryManager memoryManager;
     private final Ticker ticker;
-    private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
 
-    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
-        this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker());
+    // Two caches exist: the "serverCaches" cache which is used for handling live
+    // queries, and the "persistentServerCaches" cache which is used to store data
+    // between queries. If we are out of memory, attempt to clear out entries from
+    // the persistent cache before throwing an exception.
+    private volatile Cache<ImmutableBytesPtr, CacheEntry> serverCaches;
+    private volatile Cache<ImmutableBytesPtr, CacheEntry> persistentServerCaches;
+
+    private final long EVICTION_MARGIN_BYTES = 10000000;
+
+    private class CacheEntry implements Comparable<CacheEntry>, Closeable {
+        private ImmutableBytesPtr cacheId;
+        private ImmutableBytesWritable cachePtr;
+        private int hits;
+        private int liveQueriesCount;
+        private boolean usePersistentCache;
+        private long size;
+        private Closeable closeable;
+
+        public CacheEntry(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, 
+                ServerCacheFactory cacheFactory, byte[] txState, MemoryChunk chunk,
+                boolean usePersistentCache, boolean useProtoForIndexMaintainer,
+                int clientVersion) throws SQLException {
+            this.cacheId = cacheId;
+            this.cachePtr = cachePtr;
+            this.size = cachePtr.getLength();
+            this.hits = 0;
+            this.liveQueriesCount = 0;
+            this.usePersistentCache = usePersistentCache;
+            this.closeable = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion);
+        }
+
+        public void close() throws IOException {
+            this.closeable.close();
+        }
+
+        synchronized public void incrementLiveQueryCount() {
+            liveQueriesCount++;
+            hits++;
+        }
+
+        synchronized public void decrementLiveQueryCount() {
+            liveQueriesCount--;
+        }
+
+        synchronized public boolean isLive() {
+            return liveQueriesCount > 0;
+        }
+
+        public boolean getUsePersistentCache() {
+            return usePersistentCache;
+        }
+
+        public ImmutableBytesPtr getCacheId() {
+            return cacheId;
+        }
+
+        private Float rank() {
+            return (float)hits;
+        }
+
+        @Override
+        public int compareTo(CacheEntry o) {
+            return rank().compareTo(o.rank());
+        }
+    }
+
+    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs) {
+        this(memoryManager, maxTimeToLiveMs, maxPersistenceTimeToLiveMs, Ticker.systemTicker());
     }
     
-    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker) {
+    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs, Ticker ticker) {
         this.memoryManager = memoryManager;
         this.maxTimeToLiveMs = maxTimeToLiveMs;
+        this.maxPersistenceTimeToLiveMs = maxPersistenceTimeToLiveMs;
         this.ticker = ticker;
     }
     
@@ -69,6 +143,9 @@ public class TenantCacheImpl implements TenantCache {
             if (serverCaches != null) {
                 serverCaches.cleanUp();
             }
+            if (persistentServerCaches != null) {
+                persistentServerCaches.cleanUp();
+            }
         }
     }
     
@@ -77,57 +154,133 @@ public class TenantCacheImpl implements TenantCache {
         return memoryManager;
     }
 
-    private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+    private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() {
         /* Delay creation of this map until it's needed */
         if (serverCaches == null) {
             synchronized(this) {
                 if (serverCaches == null) {
-                    serverCaches = CacheBuilder.newBuilder()
-                        .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
-                        .ticker(getTicker())
-                        .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
-                            @Override
-                            public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
-                                Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
-                            }
-                        })
-                        .build();
+                    serverCaches = buildCache(maxTimeToLiveMs, false);
                 }
             }
         }
         return serverCaches;
     }
-    
-    @Override
+
+    private Cache<ImmutableBytesPtr,CacheEntry> getPersistentServerCaches() {
+        /* Delay creation of this map until it's needed */
+        if (persistentServerCaches == null) {
+            synchronized(this) {
+                if (persistentServerCaches == null) {
+                    persistentServerCaches = buildCache(maxPersistenceTimeToLiveMs, true);
+                }
+            }
+        }
+        return persistentServerCaches;
+    }
+
+    private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl, final boolean isPersistent) {
+        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
+        if (isPersistent) {
+            builder.expireAfterWrite(ttl, TimeUnit.MILLISECONDS);
+        } else {
+            builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS);
+        }
+        return builder
+            .ticker(getTicker())
+            .removalListener(new RemovalListener<ImmutableBytesPtr, CacheEntry>(){
+                @Override
+                public void onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry> notification) {
+                    if (isPersistent || !notification.getValue().getUsePersistentCache()) {
+                        Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+                    }
+                }
+            })
+            .build();
+    }
+
+    synchronized private void evictInactiveEntries(long bytesNeeded) {
+        logger.debug("Trying to evict inactive cache entries to free up " + bytesNeeded + " bytes");
+        CacheEntry[] entries = getPersistentServerCaches().asMap().values().toArray(new CacheEntry[]{});
+        Arrays.sort(entries);
+        long available = this.getMemoryManager().getAvailableMemory();
+        for (int i = 0; i < entries.length && available < bytesNeeded; i++) {
+            CacheEntry entry = entries[i];
+            ImmutableBytesPtr cacheId = entry.getCacheId();
+            getPersistentServerCaches().invalidate(cacheId);
+            available = this.getMemoryManager().getAvailableMemory();
+            logger.debug("Evicted cache ID " + Bytes.toLong(cacheId.get()) + ", we now have " + available + " bytes available");
+        }
+    }
+
+    private CacheEntry getIfPresent(ImmutableBytesPtr cacheId) {
+        CacheEntry entry = getPersistentServerCaches().getIfPresent(cacheId);
+        if (entry != null) {
+            return entry;
+        }
+        return getServerCaches().getIfPresent(cacheId);
+    }
+
+	@Override
     public Closeable getServerCache(ImmutableBytesPtr cacheId) {
         getServerCaches().cleanUp();
-        return getServerCaches().getIfPresent(cacheId);
+        CacheEntry entry = getIfPresent(cacheId);
+        if (entry == null) {
+            return null;
+        }
+        return entry.closeable;
     }
-    
+
     @Override
-    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException {
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException {
         getServerCaches().cleanUp();
-        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
+        long available = this.getMemoryManager().getAvailableMemory();
+        int size = cachePtr.getLength() + txState.length;
+        if (size > available) {
+            evictInactiveEntries(size - available + EVICTION_MARGIN_BYTES);
+        }
+        MemoryChunk chunk = this.getMemoryManager().allocate(size);
         boolean success = false;
         try {
-            Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion);
-            getServerCaches().put(cacheId, element);
+            CacheEntry entry;
+            synchronized(this) {
+                entry = getIfPresent(cacheId);
+                if (entry == null) {
+                    entry = new CacheEntry(
+                            cacheId, cachePtr, cacheFactory, txState, chunk,
+                            usePersistentCache, useProtoForIndexMaintainer,
+                            clientVersion);
+                    getServerCaches().put(cacheId, entry);
+                    if (usePersistentCache) {
+                        getPersistentServerCaches().put(cacheId, entry);
+                    }
+                }
+                entry.incrementLiveQueryCount();
+            }
             success = true;
-            return element;
+            return entry;
         } finally {
             if (!success) {
                 Closeables.closeAllQuietly(Collections.singletonList(chunk));
             }
-        }           
+        }
     }
-    
+
     @Override
-    public void removeServerCache(ImmutableBytesPtr cacheId) {
-        getServerCaches().invalidate(cacheId);
+    synchronized public void removeServerCache(ImmutableBytesPtr cacheId) {
+        CacheEntry entry = getServerCaches().getIfPresent(cacheId);
+        if (entry == null) {
+            return;
+        }
+        entry.decrementLiveQueryCount();
+        if (!entry.isLive()) {
+            logger.debug("Cache ID " + Bytes.toLong(cacheId.get()) + " is no longer live, invalidate it");
+            getServerCaches().invalidate(cacheId);
+        }
     }
 
     @Override
     public void removeAllServerCache() {
         getServerCaches().invalidateAll();
+        getPersistentServerCaches().invalidateAll();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3e5f5ee..603da0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -100,6 +100,7 @@ public class QueryCompiler {
     private final SequenceManager sequenceManager;
     private final boolean projectTuples;
     private final boolean noChildParentJoinOptimization;
+    private final boolean usePersistentCache;
     private final boolean optimizeSubquery;
     private final Map<TableRef, QueryPlan> dataPlans;
     private final boolean costBased;
@@ -117,7 +118,8 @@ public class QueryCompiler {
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.sequenceManager = sequenceManager;
         this.projectTuples = projectTuples;
-        this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
+        this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION) || select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
+        this.usePersistentCache = select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
         ConnectionQueryServices services = statement.getConnection().getQueryServices();
         this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
         scan.setLoadColumnFamiliesOnDemand(true);
@@ -314,7 +316,7 @@ public class QueryCompiler {
                     if (i < count - 1) {
                         fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
                     }
-                    hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+                    hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), usePersistentCache, keyRangeLhsExpression, keyRangeRhsExpression);
                 }
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
                 QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
@@ -381,9 +383,10 @@ public class QueryCompiler {
                 HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
                         new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
                         new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+                boolean usePersistentCache = joinTable.getStatement().getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
                 Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
                 getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
-                return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
+                return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, usePersistentCache, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
             }
             case SORT_MERGE: {
                 JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index eb195c2..cc38870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -74,6 +74,7 @@ public class StatementContext {
     private final ImmutableBytesWritable tempPtr;
     private final PhoenixStatement statement;
     private final Map<PColumn, Integer> dataColumns;
+    private Map<Long, Boolean> retryingPersistentCache;
 
     private long currentTime = QueryConstants.UNSET_TIMESTAMP;
     private ScanRanges scanRanges = ScanRanges.EVERYTHING;
@@ -138,6 +139,7 @@ public class StatementContext {
         this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
         this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled,connection.getLogLevel());
         this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
+        this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
     }
 
     /**
@@ -326,5 +328,22 @@ public class StatementContext {
     public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) {
         this.isClientSideUpsertSelect = isClientSideUpsertSelect;
     }
-    
+
+    /*
+     * setRetryingPersistentCache can be used to override the USE_PERSISTENT_CACHE hint and disable the use of the
+     * persistent cache for a specific cache ID. This can be used to retry queries that failed when using the persistent
+     * cache.
+     */
+    public void setRetryingPersistentCache(long cacheId) {
+        retryingPersistentCache.put(cacheId, true);
+    }
+
+    public boolean getRetryingPersistentCache(long cacheId) {
+        Boolean retrying = retryingPersistentCache.get(cacheId);
+        if (retrying == null) {
+            return false;
+        } else {
+            return retrying;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index d82aaba..96af154 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -51,7 +51,6 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
@@ -122,8 +121,7 @@ public class HashJoinRegionScanner implements RegionScanner {
             }
             HashCache hashCache = (HashCache)cache.getServerCache(joinId);
             if (hashCache == null) {
-                Exception cause = new HashJoinCacheNotFoundException(
-                        Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
+                Exception cause = new HashJoinCacheNotFoundException(Bytes.toLong(joinId.get()));
                 throw new DoNotRetryIOException(cause.getMessage(), cause);
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 9d78659..86219c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -75,7 +75,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
           ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
           tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
               cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(),
-              request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
+              request.getUsePersistentCache(), request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
         } catch (Throwable e) {
             ProtobufUtil.setControllerException(controller,
                 ServerUtil.createIOException("Error when adding cache: ", e));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index fdca334..c42b9df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -5760,6 +5760,16 @@ public final class ServerCachingProtos {
      * <code>optional int32 clientVersion = 7;</code>
      */
     int getClientVersion();
+
+    // optional bool usePersistentCache = 8;
+    /**
+     * <code>optional bool usePersistentCache = 8;</code>
+     */
+    boolean hasUsePersistentCache();
+    /**
+     * <code>optional bool usePersistentCache = 8;</code>
+     */
+    boolean getUsePersistentCache();
   }
   /**
    * Protobuf type {@code AddServerCacheRequest}
@@ -5863,6 +5873,11 @@ public final class ServerCachingProtos {
               clientVersion_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              usePersistentCache_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6027,6 +6042,22 @@ public final class ServerCachingProtos {
       return clientVersion_;
     }
 
+    // optional bool usePersistentCache = 8;
+    public static final int USEPERSISTENTCACHE_FIELD_NUMBER = 8;
+    private boolean usePersistentCache_;
+    /**
+     * <code>optional bool usePersistentCache = 8;</code>
+     */
+    public boolean hasUsePersistentCache() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional bool usePersistentCache = 8;</code>
+     */
+    public boolean getUsePersistentCache() {
+      return usePersistentCache_;
+    }
+
     private void initFields() {
       tenantId_ = com.google.protobuf.ByteString.EMPTY;
       cacheId_ = com.google.protobuf.ByteString.EMPTY;
@@ -6035,6 +6066,7 @@ public final class ServerCachingProtos {
       txState_ = com.google.protobuf.ByteString.EMPTY;
       hasProtoBufIndexMaintainer_ = false;
       clientVersion_ = 0;
+      usePersistentCache_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6089,6 +6121,9 @@ public final class ServerCachingProtos {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt32(7, clientVersion_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBool(8, usePersistentCache_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -6126,6 +6161,10 @@ public final class ServerCachingProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, clientVersion_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(8, usePersistentCache_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6184,6 +6223,11 @@ public final class ServerCachingProtos {
         result = result && (getClientVersion()
             == other.getClientVersion());
       }
+      result = result && (hasUsePersistentCache() == other.hasUsePersistentCache());
+      if (hasUsePersistentCache()) {
+        result = result && (getUsePersistentCache()
+            == other.getUsePersistentCache());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -6225,6 +6269,10 @@ public final class ServerCachingProtos {
         hash = (37 * hash) + CLIENTVERSION_FIELD_NUMBER;
         hash = (53 * hash) + getClientVersion();
       }
+      if (hasUsePersistentCache()) {
+        hash = (37 * hash) + USEPERSISTENTCACHE_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getUsePersistentCache());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6358,6 +6406,8 @@ public final class ServerCachingProtos {
         bitField0_ = (bitField0_ & ~0x00000020);
         clientVersion_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        usePersistentCache_ = false;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -6422,6 +6472,10 @@ public final class ServerCachingProtos {
           to_bitField0_ |= 0x00000040;
         }
         result.clientVersion_ = clientVersion_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.usePersistentCache_ = usePersistentCache_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6459,6 +6513,9 @@ public final class ServerCachingProtos {
         if (other.hasClientVersion()) {
           setClientVersion(other.getClientVersion());
         }
+        if (other.hasUsePersistentCache()) {
+          setUsePersistentCache(other.getUsePersistentCache());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6914,6 +6971,39 @@ public final class ServerCachingProtos {
         return this;
       }
 
+      // optional bool usePersistentCache = 8;
+      private boolean usePersistentCache_ ;
+      /**
+       * <code>optional bool usePersistentCache = 8;</code>
+       */
+      public boolean hasUsePersistentCache() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional bool usePersistentCache = 8;</code>
+       */
+      public boolean getUsePersistentCache() {
+        return usePersistentCache_;
+      }
+      /**
+       * <code>optional bool usePersistentCache = 8;</code>
+       */
+      public Builder setUsePersistentCache(boolean value) {
+        bitField0_ |= 0x00000080;
+        usePersistentCache_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool usePersistentCache = 8;</code>
+       */
+      public Builder clearUsePersistentCache() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        usePersistentCache_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
     }
 
@@ -8723,22 +8813,23 @@ public final class ServerCachingProtos {
       "\timmutable\030\022 \002(\010\022&\n\021indexedColumnInfo\030\023 " +
       "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" +
       "\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\025\n\rview" +
-      "IndexType\030\026 \001(\005\"\334\001\n\025AddServerCacheReques" +
+      "IndexType\030\026 \001(\005\"\370\001\n\025AddServerCacheReques" +
       "t\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010" +
       "cachePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022" +
       ")\n\014cacheFactory\030\004 \002(\0132\023.ServerCacheFacto" +
       "ry\022\017\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBufIndexM" +
-      "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\"(",
-      "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" +
-      "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" +
-      "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC" +
-      "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" +
-      "achingService\022A\n\016addServerCache\022\026.AddSer" +
-      "verCacheRequest\032\027.AddServerCacheResponse" +
-      "\022J\n\021removeServerCache\022\031.RemoveServerCach" +
-      "eRequest\032\032.RemoveServerCacheResponseBG\n(" +
-      "org.apache.phoenix.coprocessor.generated" +
-      "B\023ServerCachingProtosH\001\210\001\001\240\001\001"
+      "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\022\032",
+      "\n\022usePersistentCache\030\010 \001(\010\"(\n\026AddServerC" +
+      "acheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveSe" +
+      "rverCacheRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007ca" +
+      "cheId\030\002 \002(\014\"+\n\031RemoveServerCacheResponse" +
+      "\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCachingServic" +
+      "e\022A\n\016addServerCache\022\026.AddServerCacheRequ" +
+      "est\032\027.AddServerCacheResponse\022J\n\021removeSe" +
+      "rverCache\022\031.RemoveServerCacheRequest\032\032.R" +
+      "emoveServerCacheResponseBG\n(org.apache.p" +
+      "hoenix.coprocessor.generatedB\023ServerCach",
+      "ingProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8774,7 +8865,7 @@ public final class ServerCachingProtos {
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_AddServerCacheRequest_descriptor,
-              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", });
+              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", "UsePersistentCache", });
           internal_static_AddServerCacheResponse_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_AddServerCacheResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index bfe089d..b5cd6b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -22,21 +22,23 @@ import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 import static org.apache.phoenix.util.NumberUtil.add;
 import static org.apache.phoenix.util.NumberUtil.getMin;
 
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -46,6 +48,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
@@ -57,9 +60,7 @@ import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.FilterResultIterator;
-import org.apache.phoenix.iterate.ParallelScanGrouper;
-import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.*;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
@@ -86,9 +87,11 @@ import org.apache.phoenix.util.SQLCloseables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.phoenix.util.ServerUtil;
 
 public class HashJoinPlan extends DelegateQueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
+    private static final Random RANDOM = new Random();
 
     private final SelectStatement statement;
     private final HashJoinInfo joinInfo;
@@ -105,6 +108,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private Long estimatedBytes;
     private Long estimateInfoTs;
     private boolean getEstimatesCalled;
+    private boolean hasSubPlansWithPersistentCache;
     
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
@@ -134,8 +138,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.recompileWhereClause = recompileWhereClause;
         this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size());
         this.tableRefs.addAll(plan.getSourceRefs());
+        this.hasSubPlansWithPersistentCache = false;
         for (SubPlan subPlan : subPlans) {
             tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
+            if (subPlan instanceof HashSubPlan && ((HashSubPlan)subPlan).usePersistentCache) {
+                this.hasSubPlansWithPersistentCache = true;
+            }
         }
         QueryServices services = plan.getContext().getConnection().getQueryServices();
         this.maxServerCacheTimeToLive = services.getProps().getInt(
@@ -214,7 +222,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             SQLCloseables.closeAllQuietly(dependencies.values());
             throw firstException;
         }
-        
+
         Expression postFilter = null;
         boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
         if (recompileWhereClause || hasKeyRangeExpressions) {
@@ -241,8 +249,35 @@ public class HashJoinPlan extends DelegateQueryPlan {
         if (statement.getInnerSelectStatement() != null && postFilter != null) {
             iterator = new FilterResultIterator(iterator, postFilter);
         }
-        
-        return iterator;
+
+        if (hasSubPlansWithPersistentCache) {
+            return peekForPersistentCache(iterator, scanGrouper, scan);
+        } else {
+            return iterator;
+        }
+    }
+
+    private ResultIterator peekForPersistentCache(ResultIterator iterator, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        // The persistent subquery is optimistic and assumes caches are present on region
+        // servers. We verify that this is the case by peeking at one result. If there is
+        // a cache missing exception, we retry the query with the persistent cache disabled
+        // for that specific cache ID.
+        PeekingResultIterator peeking = LookAheadResultIterator.wrap(iterator);
+        try {
+            peeking.peek();
+        } catch (Exception e) {
+            try {
+                throw ServerUtil.parseServerException(e);
+            } catch (HashJoinCacheNotFoundException e2) {
+                Long cacheId = e2.getCacheId();
+                if (delegate.getContext().getRetryingPersistentCache(cacheId)) {
+                    throw e2;
+                }
+                delegate.getContext().setRetryingPersistentCache(cacheId);
+                return iterator(scanGrouper, scan);
+            }
+        }
+        return peeking;
     }
 
     private Expression createKeyRangeExpression(Expression lhsExpression,
@@ -467,20 +502,29 @@ public class HashJoinPlan extends DelegateQueryPlan {
         private final QueryPlan plan;
         private final List<Expression> hashExpressions;
         private final boolean singleValueOnly;
+        private final boolean usePersistentCache;
         private final Expression keyRangeLhsExpression;
         private final Expression keyRangeRhsExpression;
+        private final MessageDigest digest;
         
         public HashSubPlan(int index, QueryPlan subPlan, 
                 List<Expression> hashExpressions,
                 boolean singleValueOnly,
+                boolean usePersistentCache,
                 Expression keyRangeLhsExpression, 
                 Expression keyRangeRhsExpression) {
             this.index = index;
             this.plan = subPlan;
             this.hashExpressions = hashExpressions;
             this.singleValueOnly = singleValueOnly;
+            this.usePersistentCache = usePersistentCache;
             this.keyRangeLhsExpression = keyRangeLhsExpression;
             this.keyRangeRhsExpression = keyRangeRhsExpression;
+            try {
+                this.digest = MessageDigest.getInstance("SHA-256");
+            } catch (NoSuchAlgorithmException e) {
+                throw new RuntimeException(e);
+            }
         }
 
         @Override
@@ -494,19 +538,37 @@ public class HashJoinPlan extends DelegateQueryPlan {
             if (hashExpressions != null) {
                 ResultIterator iterator = plan.iterator();
                 try {
-                    cache =
-                            parent.hashClient.addHashCache(ranges, iterator,
-                                plan.getEstimatedSize(), hashExpressions, singleValueOnly,
+                    final byte[] cacheId;
+                    String queryString = plan.getStatement().toString().replaceAll("\\$[0-9]+", "\\$");
+                    if (usePersistentCache) {
+                        cacheId = Arrays.copyOfRange(digest.digest(queryString.getBytes()), 0, 8);
+                        boolean retrying = parent.delegate.getContext().getRetryingPersistentCache(Bytes.toLong(cacheId));
+                        if (!retrying) {
+                            try {
+                                cache = parent.hashClient.createServerCache(cacheId, parent.delegate);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    } else {
+                        cacheId = Bytes.toBytes(RANDOM.nextLong());
+                    }
+                    LOG.debug("Using cache ID " + Hex.encodeHexString(cacheId) + " for " + queryString);
+                    if (cache == null) {
+                        LOG.debug("Making RPC to add cache " + Hex.encodeHexString(cacheId));
+                        cache = parent.hashClient.addHashCache(ranges, cacheId, iterator,
+                                plan.getEstimatedSize(), hashExpressions, singleValueOnly, usePersistentCache,
                                 parent.delegate.getTableRef().getTable(), keyRangeRhsExpression,
                                 keyRangeRhsValues);
-                    long endTime = System.currentTimeMillis();
-                    boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
-                    if (!isSet && (endTime
-                            - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
-                        LOG.warn(addCustomAnnotations(
-                            "Hash plan [" + index
-                                    + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.",
-                            parent.delegate.getContext().getConnection()));
+                        long endTime = System.currentTimeMillis();
+                        boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
+                        if (!isSet && (endTime
+                                - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
+                            LOG.warn(addCustomAnnotations(
+                                "Hash plan [" + index
+                                        + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.",
+                                parent.delegate.getContext().getConnection()));
+                        }
                     }
                 } finally {
                     iterator.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index d890383..2378175 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -1309,8 +1309,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                                     throw e2;
                                 }
                                 Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId();
-                                if (!hashCacheClient.addHashCacheToServer(startKey,
-                                        caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; }
+                                ServerCache cache = caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                                if (cache .getCachePtr() != null) {
+                                    if (!hashCacheClient.addHashCacheToServer(startKey, cache, plan.getTableRef().getTable())) {
+                                        throw e2;
+                                    }
+                                }
                             }
                             concatIterators =
                                     recreateIterators(services, isLocalIndex, allIterators,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 06f612a..f1d1663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -194,13 +194,11 @@ public class TableResultIterator implements ResultIterator {
                             if (retry <= 0) {
                                 throw e1;
                             }
+                            Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
                             retry--;
                             try {
-                                Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
-
                                 ServerCache cache = caches == null ? null :
-                                    caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
-
+                                        caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
                                 if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
                                         cache, plan.getTableRef().getTable())) {
                                     throw e1;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 83ac32d..315c515 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -67,6 +68,20 @@ public class HashCacheClient  {
     }
 
     /**
+     * Creates a ServerCache object for cacheId. This is used for persistent cache, and there may or may not
+     * be corresponding data on each region server.
+     * @param cacheId ID for the cache entry
+     * @param delegate the query plan this will be used for
+     * @return client-side {@link ServerCache} representing the hash cache that may or may not be present on region servers.
+     * @throws SQLException
+     * size
+     */
+    public ServerCache createServerCache(final byte[] cacheId, QueryPlan delegate)
+            throws SQLException, IOException {
+        return serverCache.createServerCache(cacheId, delegate);
+    }
+
+    /**
      * Send the results of scanning through the scanner to all
      * region servers for regions of the table that will use the cache
      * that intersect with the minMaxKeyRange.
@@ -76,13 +91,16 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
+    public ServerCache addHashCache(
+            ScanRanges keyRanges, byte[] cacheId, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions,
+            boolean singleValueOnly, boolean usePersistentCache, PTable cacheUsingTable, Expression keyRangeRhsExpression,
+            List<Expression> keyRangeRhsValues) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
-        ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, true);
+        ServerCache cache = serverCache.addServerCache(keyRanges, cacheId, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, usePersistentCache, true);
         return cache;
     }
     
@@ -90,7 +108,7 @@ public class HashCacheClient  {
      * Should only be used to resend the hash table cache to the regionserver.
      *  
      * @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache
-     * @param cacheId Id of the cache which needs to be sent
+     * @param cache The cache which needs to be sent
      * @param pTable
      * @return
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 4fc3c70..ecf9d57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import net.jcip.annotations.Immutable;
 
@@ -139,6 +140,16 @@ public class HashCacheFactory implements ServerCacheFactory {
         }
 
         @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            Set<ImmutableBytesPtr> keySet = hashCache.keySet();
+            for (ImmutableBytesPtr key : keySet) {
+                sb.append("key: " + key + " value: " + hashCache.get(key));
+            }
+            return sb.toString();
+        }
+
+        @Override
         public void close() {
             memoryChunk.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index 02a44ad..8a83116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -83,6 +83,10 @@ public class HintNode {
         */
        USE_SORT_MERGE_JOIN,
        /**
+        * Persist the RHS results of a hash join.
+        */
+       USE_PERSISTENT_CACHE,
+       /**
         * Avoid using star-join optimization. Used for broadcast join (hash join) only.
         */
        NO_STAR_JOIN,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d681a13..d1b277a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -90,6 +90,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
+    public static final String MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCachePersistenceTimeToLiveMs";
     
     @Deprecated // Use FORCE_ROW_KEY_ORDER instead.
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = "phoenix.query.rowKeyOrderSaltedTable";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 76e79fa..35dbe3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -159,6 +159,7 @@ public class QueryServicesOptions {
     public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152;
 	// The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be.
     public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity)
+    public static final int DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS = 30 * 60000; // 30 minutes
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
     public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
index c741f4e..3c8a269 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.phoenix.cache;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
 import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -38,49 +35,53 @@ import org.junit.Test;
 
 import com.google.common.base.Ticker;
 
+import static org.junit.Assert.*;
+
 public class TenantCacheTest {
 
     @Test
     public void testInvalidateClosesMemoryChunk() throws SQLException {
         int maxServerCacheTimeToLive = 10000;
+        int maxServerCachePersistenceTimeToLive = 10;
         long maxBytes = 1000;
         GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
-        TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive);
-        ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive);
+        ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes(1L));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+        newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         newTenantCache.removeServerCache(cacheId);
         assertEquals(maxBytes, memoryManager.getAvailableMemory());
     }
-    
+
     @Test
     public void testTimeoutClosesMemoryChunk() throws Exception {
         int maxServerCacheTimeToLive = 10;
+        int maxServerCachePersistenceTimeToLive = 10;
         long maxBytes = 1000;
         GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
         ManualTicker ticker = new ManualTicker();
-        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
-        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
         cache.cleanUp();
         assertEquals(maxBytes, memoryManager.getAvailableMemory());
     }
 
-
     @Test
     public void testFreeMemoryOnAccess() throws Exception {
         int maxServerCacheTimeToLive = 10;
+        int maxServerCachePersistenceTimeToLive = 10;
         long maxBytes = 1000;
         GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
         ManualTicker ticker = new ManualTicker();
-        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
-        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
         assertNull(cache.getServerCache(cacheId1));
@@ -90,17 +91,92 @@ public class TenantCacheTest {
     @Test
     public void testExpiredCacheOnAddingNew() throws Exception {
         int maxServerCacheTimeToLive = 10;
+        int maxServerCachePersistenceTimeToLive = 10;
         long maxBytes = 10;
         GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
         ManualTicker ticker = new ManualTicker();
-        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
-        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12345678"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(2, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
+        assertEquals(2, memoryManager.getAvailableMemory());
+    }
+
+    @Test
+    public void testExpiresButStaysInPersistentAfterTimeout() throws Exception {
+        int maxServerCacheTimeToLive = 100;
+        int maxServerCachePersistenceTimeToLive = 1000;
+        long maxBytes = 1000;
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+        ManualTicker ticker = new ManualTicker();
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+        ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+        assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+        assertNotNull(cache.getServerCache(cacheId1));
+
+        // Expire it from live cache but not persistent cache
+        ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
+        cache.cleanUp();
+        assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+        assertNotNull(cache.getServerCache(cacheId1));
+
+        // Expire it from persistent cache as well
+        ticker.time += (maxServerCachePersistenceTimeToLive + 1) * 1000000;
+        cache.cleanUp();
+        assertEquals(maxBytes, memoryManager.getAvailableMemory());
+        assertNull(cache.getServerCache(cacheId1));
+    }
+
+    @Test
+    public void testExpiresButStaysInPersistentAfterRemove() throws Exception {
+        int maxServerCacheTimeToLive = 100;
+        int maxServerCachePersistenceTimeToLive = 1000;
+        long maxBytes = 1000;
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+        ManualTicker ticker = new ManualTicker();
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+        ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12"));
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+        assertEquals(maxBytes-2, memoryManager.getAvailableMemory());
+        assertNotNull(cache.getServerCache(cacheId1));
+
+        // Remove should only remove from live cache
+        cache.removeServerCache(cacheId1);
+        assertEquals(maxBytes-2, memoryManager.getAvailableMemory());
+        assertNotNull(cache.getServerCache(cacheId1));
+    }
+
+    @Test
+    public void testEvictPersistentCacheIfSpaceIsNeeded() throws Exception {
+        int maxServerCacheTimeToLive = 100;
+        int maxServerCachePersistenceTimeToLive = 1000;
+        long maxBytes = 10;
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+        ManualTicker ticker = new ManualTicker();
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+        ImmutableBytesWritable cachePtr1 = new ImmutableBytesWritable(Bytes.toBytes("1234"));
+        cache.addServerCache(cacheId1, cachePtr1, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+        assertEquals(6, memoryManager.getAvailableMemory());
+
+        // Remove it, but it should stay in persistent cache
+        cache.removeServerCache(cacheId1);
+        assertNotNull(cache.getServerCache(cacheId1));
+        assertEquals(6, memoryManager.getAvailableMemory());
+
+        // Let's do an entry that will require eviction
+        ImmutableBytesPtr cacheId2 = new ImmutableBytesPtr(Bytes.toBytes(2L));
+        ImmutableBytesWritable cachePtr2 = new ImmutableBytesWritable(Bytes.toBytes("12345678"));
+        cache.addServerCache(cacheId2, cachePtr2, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(2, memoryManager.getAvailableMemory());
+        assertNull(cache.getServerCache(cacheId1));
+        assertNotNull(cache.getServerCache(cacheId2));
     }
 
     public static class ManualTicker extends Ticker {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index d92f2cd..0d2d1d2 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -73,6 +73,7 @@ message AddServerCacheRequest {
   optional bytes txState = 5;
   optional bool hasProtoBufIndexMaintainer = 6;
   optional int32 clientVersion = 7;
+  optional bool usePersistentCache = 8;
 }
 
 message AddServerCacheResponse {