You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2018/09/11 01:56:25 UTC
[8/8] phoenix git commit: PHOENIX-4666 Persistent subquery cache for
hash joins
PHOENIX-4666 Persistent subquery cache for hash joins
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4e0405b7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4e0405b7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4e0405b7
Branch: refs/heads/4.x-HBase-1.2
Commit: 4e0405b75f39f7ae45aa4f6da417b4be3691efef
Parents: ade7d28
Author: Marcell Ortutay <ma...@gmail.com>
Authored: Thu Mar 29 12:59:03 2018 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Mon Sep 10 21:36:01 2018 -0400
----------------------------------------------------------------------
.../end2end/join/HashJoinPersistentCacheIT.java | 167 +++++++++++++++
.../org/apache/phoenix/cache/GlobalCache.java | 22 +-
.../apache/phoenix/cache/ServerCacheClient.java | 59 ++++--
.../org/apache/phoenix/cache/TenantCache.java | 2 +-
.../apache/phoenix/cache/TenantCacheImpl.java | 209 ++++++++++++++++---
.../apache/phoenix/compile/QueryCompiler.java | 9 +-
.../phoenix/compile/StatementContext.java | 21 +-
.../coprocessor/HashJoinRegionScanner.java | 4 +-
.../coprocessor/ServerCachingEndpointImpl.java | 2 +-
.../generated/ServerCachingProtos.java | 117 +++++++++--
.../apache/phoenix/execute/HashJoinPlan.java | 104 +++++++--
.../phoenix/iterate/BaseResultIterators.java | 8 +-
.../phoenix/iterate/TableResultIterator.java | 6 +-
.../apache/phoenix/join/HashCacheClient.java | 24 ++-
.../apache/phoenix/join/HashCacheFactory.java | 11 +
.../java/org/apache/phoenix/parse/HintNode.java | 4 +
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 1 +
.../apache/phoenix/cache/TenantCacheTest.java | 112 ++++++++--
.../src/main/ServerCachingService.proto | 1 +
phoenix-protocol/src/main/build-proto.sh | 6 +
21 files changed, 773 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
new file mode 100644
index 0000000..2f072b8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.join;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.join.HashJoinCacheIT.InvalidateHashCache;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class HashJoinPersistentCacheIT extends BaseJoinIT {
+
+ @Override
+ protected String getTableName(Connection conn, String virtualName) throws Exception {
+ String realName = super.getTableName(conn, virtualName);
+ TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName),
+ InvalidateHashCache.class);
+ return realName;
+ }
+
+ @Test
+ public void testPersistentCache() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ createTestTable(getUrl(),
+ "CREATE TABLE IF NOT EXISTS states (state CHAR(2) " +
+ "NOT NULL, name VARCHAR NOT NULL CONSTRAINT my_pk PRIMARY KEY (state, name))");
+ createTestTable(getUrl(),
+ "CREATE TABLE IF NOT EXISTS cities (state CHAR(2) " +
+ "NOT NULL, city VARCHAR NOT NULL, population BIGINT " +
+ "CONSTRAINT my_pk PRIMARY KEY (state, city))");
+
+ conn.prepareStatement(
+ "UPSERT INTO states VALUES ('CA', 'California')").executeUpdate();
+ conn.prepareStatement(
+ "UPSERT INTO states VALUES ('AZ', 'Arizona')").executeUpdate();
+ conn.prepareStatement(
+ "UPSERT INTO cities VALUES ('CA', 'San Francisco', 50000)").executeUpdate();
+ conn.prepareStatement(
+ "UPSERT INTO cities VALUES ('CA', 'Sacramento', 3000)").executeUpdate();
+ conn.prepareStatement(
+ "UPSERT INTO cities VALUES ('AZ', 'Phoenix', 20000)").executeUpdate();
+ conn.commit();
+
+ /* First, run query without using the persistent cache. This should return
+ * different results after an UPSERT takes place.
+ */
+ ResultSet rs = conn.prepareStatement(
+ "SELECT SUM(population) FROM states s "
+ +"JOIN cities c ON c.state = s.state").executeQuery();
+ rs.next();
+ int population1 = rs.getInt(1);
+
+ conn.prepareStatement("UPSERT INTO cities VALUES ('CA', 'Mt View', 1500)").executeUpdate();
+ conn.commit();
+ rs = conn.prepareStatement(
+ "SELECT SUM(population) FROM states s " +
+ "JOIN cities c ON c.state = s.state").executeQuery();
+ rs.next();
+ int population2 = rs.getInt(1);
+
+ assertEquals(73000, population1);
+ assertEquals(74500, population2);
+
+ /* Second, run query using the persistent cache. This should return the
+ * same results after an UPSERT takes place.
+ */
+ rs = conn.prepareStatement(
+ "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) FROM states s " +
+ "JOIN cities c ON c.state = s.state").executeQuery();
+ rs.next();
+ int population3 = rs.getInt(1);
+
+ conn.prepareStatement(
+ "UPSERT INTO cities VALUES ('CA', 'Palo Alto', 2000)").executeUpdate();
+ conn.commit();
+
+ rs = conn.prepareStatement(
+ "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
+ "FROM states s JOIN cities c ON c.state = s.state").executeQuery();
+ rs.next();
+ int population4 = rs.getInt(1);
+ rs = conn.prepareStatement(
+ "SELECT SUM(population) FROM states s JOIN cities c ON c.state = s.state")
+ .executeQuery();
+ rs.next();
+ int population5 = rs.getInt(1);
+
+ assertEquals(74500, population3);
+ assertEquals(74500, population4);
+ assertEquals(76500, population5);
+
+ /* Let's make sure caches can be used across queries. We'll set up a
+ * cache, and make sure it is used on two different queries with the
+ * same subquery.
+ */
+
+ String sumQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
+ "FROM cities c JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
+ "ON sq.state = c.state";
+ String distinctQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ " +
+ "COUNT(DISTINCT(c.city)) FROM cities c " +
+ "JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
+ "ON sq.state = c.state";
+ String sumQueryUncached = sumQueryCached.replace(
+ "/*+ USE_PERSISTENT_CACHE */", "");
+ String distinctQueryUncached = distinctQueryCached.replace(
+ "/*+ USE_PERSISTENT_CACHE */", "");
+
+ rs = conn.prepareStatement(sumQueryCached).executeQuery();
+ rs.next();
+ int population6 = rs.getInt(1);
+ rs = conn.prepareStatement(distinctQueryCached).executeQuery();
+ rs.next();
+ int distinct1 = rs.getInt(1);
+ assertEquals(4, distinct1);
+
+ // Add a new city that matches the queries. This should not affect results
+ // using persistent caching.
+ conn.prepareStatement("UPSERT INTO states VALUES ('CO', 'Colorado')").executeUpdate();
+ conn.prepareStatement("UPSERT INTO cities VALUES ('CO', 'Denver', 6000)").executeUpdate();
+ conn.commit();
+
+ rs = conn.prepareStatement(sumQueryCached).executeQuery();
+ rs.next();
+ int population7 = rs.getInt(1);
+ assertEquals(population6, population7);
+ rs = conn.prepareStatement(distinctQueryCached).executeQuery();
+ rs.next();
+ int distinct2 = rs.getInt(1);
+ assertEquals(distinct1, distinct2);
+
+ // Finally, make sure uncached queries give up to date results
+ rs = conn.prepareStatement(sumQueryUncached).executeQuery();
+ rs.next();
+ int population8 = rs.getInt(1);
+ assertEquals(population8, 62500);
+ rs = conn.prepareStatement(distinctQueryUncached).executeQuery();
+ rs.next();
+ int distinct3 = rs.getInt(1);
+ assertEquals(5, distinct3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index ae77174..5f3e29b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -148,7 +148,12 @@ public class GlobalCache extends TenantCacheImpl {
private GlobalCache(Configuration config) {
super(new GlobalMemoryManager(getMaxMemorySize(config)),
- config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+ config.getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS),
+ config.getInt(
+ QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS));
this.config = config;
}
@@ -164,9 +169,18 @@ public class GlobalCache extends TenantCacheImpl {
public TenantCache getChildTenantCache(ImmutableBytesPtr tenantId) {
TenantCache tenantCache = perTenantCacheMap.get(tenantId);
if (tenantCache == null) {
- int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
- int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
- TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+ int maxTenantMemoryPerc = config.getInt(
+ MAX_TENANT_MEMORY_PERC_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+ int maxServerCacheTimeToLive = config.getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+ int maxServerCachePersistenceTimeToLive = config.getInt(
+ QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS);
+ TenantCacheImpl newTenantCache = new TenantCacheImpl(
+ new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc),
+ maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive);
tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
if (tenantCache == null) {
tenantCache = newTenantCache;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 5e284bd..011a6f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
@@ -59,6 +61,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheFactory;
@@ -75,6 +78,8 @@ import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
+import com.google.protobuf.ByteString;
+
/**
*
* Client for sending cache to each region server
@@ -215,22 +220,46 @@ public class ServerCacheClient {
}
}
}
-
}
-
- public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
- final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException {
- return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
+
+ public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate)
+ throws SQLException, IOException {
+ PTable cacheUsingTable = delegate.getTableRef().getTable();
+ ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices();
+ List<HRegionLocation> locations = services.getAllTableRegions(
+ cacheUsingTable.getPhysicalName().getBytes());
+ int nRegions = locations.size();
+ Set<HRegionLocation> servers = new HashSet<>(nRegions);
+ cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
+ return new ServerCache(cacheId, servers, new ImmutableBytesWritable(
+ new byte[]{}), services, false);
}
-
- public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
- final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient)
+
+ public ServerCache addServerCache(
+ ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+ final ServerCacheFactory cacheFactory, final PTable cacheUsingTable)
throws SQLException {
+ return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
+ }
+
+ public ServerCache addServerCache(
+ ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+ final ServerCacheFactory cacheFactory, final PTable cacheUsingTable,
+ boolean storeCacheOnClient) throws SQLException {
+ final byte[] cacheId = ServerCacheClient.generateId();
+ return addServerCache(keyRanges, cacheId, cachePtr, txState, cacheFactory,
+ cacheUsingTable, false, storeCacheOnClient);
+ }
+
+ public ServerCache addServerCache(
+ ScanRanges keyRanges, final byte[] cacheId, final ImmutableBytesWritable cachePtr,
+ final byte[] txState, final ServerCacheFactory cacheFactory,
+ final PTable cacheUsingTable, final boolean usePersistentCache,
+ boolean storeCacheOnClient) throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
List<Closeable> closeables = new ArrayList<Closeable>();
ServerCache hashCacheSpec = null;
SQLException firstException = null;
- final byte[] cacheId = generateId();
/**
* Execute EndPoint in parallel on each server to send compressed hash cache
*/
@@ -251,7 +280,7 @@ public class ServerCacheClient {
byte[] regionEndKey = entry.getRegionInfo().getEndKey();
if ( ! servers.contains(entry) &&
keyRanges.intersectRegion(regionStartKey, regionEndKey,
- cacheUsingTable.getIndexType() == IndexType.LOCAL)) {
+ cacheUsingTable.getIndexType() == IndexType.LOCAL)) {
// Call RPC once per server
servers.add(entry);
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));}
@@ -262,7 +291,7 @@ public class ServerCacheClient {
@Override
public Boolean call() throws Exception {
- return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState);
+ return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState, usePersistentCache);
}
/**
@@ -291,7 +320,7 @@ public class ServerCacheClient {
for (Future<Boolean> future : futures) {
future.get(timeoutMs, TimeUnit.MILLISECONDS);
}
-
+
cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
success = true;
} catch (SQLException e) {
@@ -444,7 +473,7 @@ public class ServerCacheClient {
}
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
- txState);
+ txState, false);
}
return success;
} finally {
@@ -453,7 +482,7 @@ public class ServerCacheClient {
}
public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
- final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState)
+ final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState, final boolean usePersistentCache)
throws Exception {
byte[] keyInRegion = getKeyInRegion(key);
final Map<byte[], AddServerCacheResponse> results;
@@ -483,6 +512,7 @@ public class ServerCacheClient {
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
}
builder.setCacheId(ByteStringer.wrap(cacheId));
+ builder.setUsePersistentCache(usePersistentCache);
builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
builder.setHasProtoBufIndexMaintainer(true);
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
@@ -501,7 +531,6 @@ public class ServerCacheClient {
}
if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
return false;
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index c4e82c2..e37d4d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager;
public interface TenantCache {
MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId);
- Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException;
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId);
void removeAllServerCache();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 1dc59bc..dc4c9e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -18,16 +18,22 @@
package org.apache.phoenix.cache;
import java.io.Closeable;
+import java.io.IOException;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.Closeables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
@@ -44,18 +50,86 @@ import com.google.common.cache.RemovalNotification;
* @since 0.1
*/
public class TenantCacheImpl implements TenantCache {
+ private static final Logger logger = LoggerFactory.getLogger(TenantCacheImpl.class);
private final int maxTimeToLiveMs;
+ private final int maxPersistenceTimeToLiveMs;
private final MemoryManager memoryManager;
private final Ticker ticker;
- private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
- public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
- this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker());
+ // Two caches exist: the "serverCaches" cache which is used for handling live
+ // queries, and the "persistentServerCaches" cache which is used to store data
+ // between queries. If we are out of memory, attempt to clear out entries from
+ // the persistent cache before throwing an exception.
+ private volatile Cache<ImmutableBytesPtr, CacheEntry> serverCaches;
+ private volatile Cache<ImmutableBytesPtr, CacheEntry> persistentServerCaches;
+
+ private final long EVICTION_MARGIN_BYTES = 10000000;
+
+ private class CacheEntry implements Comparable<CacheEntry>, Closeable {
+ private ImmutableBytesPtr cacheId;
+ private ImmutableBytesWritable cachePtr;
+ private int hits;
+ private int liveQueriesCount;
+ private boolean usePersistentCache;
+ private long size;
+ private Closeable closeable;
+
+ public CacheEntry(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr,
+ ServerCacheFactory cacheFactory, byte[] txState, MemoryChunk chunk,
+ boolean usePersistentCache, boolean useProtoForIndexMaintainer,
+ int clientVersion) throws SQLException {
+ this.cacheId = cacheId;
+ this.cachePtr = cachePtr;
+ this.size = cachePtr.getLength();
+ this.hits = 0;
+ this.liveQueriesCount = 0;
+ this.usePersistentCache = usePersistentCache;
+ this.closeable = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion);
+ }
+
+ public void close() throws IOException {
+ this.closeable.close();
+ }
+
+ synchronized public void incrementLiveQueryCount() {
+ liveQueriesCount++;
+ hits++;
+ }
+
+ synchronized public void decrementLiveQueryCount() {
+ liveQueriesCount--;
+ }
+
+ synchronized public boolean isLive() {
+ return liveQueriesCount > 0;
+ }
+
+ public boolean getUsePersistentCache() {
+ return usePersistentCache;
+ }
+
+ public ImmutableBytesPtr getCacheId() {
+ return cacheId;
+ }
+
+ private Float rank() {
+ return (float)hits;
+ }
+
+ @Override
+ public int compareTo(CacheEntry o) {
+ return rank().compareTo(o.rank());
+ }
+ }
+
+ public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs) {
+ this(memoryManager, maxTimeToLiveMs, maxPersistenceTimeToLiveMs, Ticker.systemTicker());
}
- public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker) {
+ public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs, Ticker ticker) {
this.memoryManager = memoryManager;
this.maxTimeToLiveMs = maxTimeToLiveMs;
+ this.maxPersistenceTimeToLiveMs = maxPersistenceTimeToLiveMs;
this.ticker = ticker;
}
@@ -69,6 +143,9 @@ public class TenantCacheImpl implements TenantCache {
if (serverCaches != null) {
serverCaches.cleanUp();
}
+ if (persistentServerCaches != null) {
+ persistentServerCaches.cleanUp();
+ }
}
}
@@ -77,57 +154,133 @@ public class TenantCacheImpl implements TenantCache {
return memoryManager;
}
- private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+ private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() {
/* Delay creation of this map until it's needed */
if (serverCaches == null) {
synchronized(this) {
if (serverCaches == null) {
- serverCaches = CacheBuilder.newBuilder()
- .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
- .ticker(getTicker())
- .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
- @Override
- public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
- Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
- }
- })
- .build();
+ serverCaches = buildCache(maxTimeToLiveMs, false);
}
}
}
return serverCaches;
}
-
- @Override
+
+ private Cache<ImmutableBytesPtr,CacheEntry> getPersistentServerCaches() {
+ /* Delay creation of this map until it's needed */
+ if (persistentServerCaches == null) {
+ synchronized(this) {
+ if (persistentServerCaches == null) {
+ persistentServerCaches = buildCache(maxPersistenceTimeToLiveMs, true);
+ }
+ }
+ }
+ return persistentServerCaches;
+ }
+
+ private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl, final boolean isPersistent) {
+ CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
+ if (isPersistent) {
+ builder.expireAfterWrite(ttl, TimeUnit.MILLISECONDS);
+ } else {
+ builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS);
+ }
+ return builder
+ .ticker(getTicker())
+ .removalListener(new RemovalListener<ImmutableBytesPtr, CacheEntry>(){
+ @Override
+ public void onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry> notification) {
+ if (isPersistent || !notification.getValue().getUsePersistentCache()) {
+ Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+ }
+ }
+ })
+ .build();
+ }
+
+ synchronized private void evictInactiveEntries(long bytesNeeded) {
+ logger.debug("Trying to evict inactive cache entries to free up " + bytesNeeded + " bytes");
+ CacheEntry[] entries = getPersistentServerCaches().asMap().values().toArray(new CacheEntry[]{});
+ Arrays.sort(entries);
+ long available = this.getMemoryManager().getAvailableMemory();
+ for (int i = 0; i < entries.length && available < bytesNeeded; i++) {
+ CacheEntry entry = entries[i];
+ ImmutableBytesPtr cacheId = entry.getCacheId();
+ getPersistentServerCaches().invalidate(cacheId);
+ available = this.getMemoryManager().getAvailableMemory();
+ logger.debug("Evicted cache ID " + Bytes.toLong(cacheId.get()) + ", we now have " + available + " bytes available");
+ }
+ }
+
+ private CacheEntry getIfPresent(ImmutableBytesPtr cacheId) {
+ CacheEntry entry = getPersistentServerCaches().getIfPresent(cacheId);
+ if (entry != null) {
+ return entry;
+ }
+ return getServerCaches().getIfPresent(cacheId);
+ }
+
+ @Override
public Closeable getServerCache(ImmutableBytesPtr cacheId) {
getServerCaches().cleanUp();
- return getServerCaches().getIfPresent(cacheId);
+ CacheEntry entry = getIfPresent(cacheId);
+ if (entry == null) {
+ return null;
+ }
+ return entry.closeable;
}
-
+
@Override
- public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException {
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException {
getServerCaches().cleanUp();
- MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
+ long available = this.getMemoryManager().getAvailableMemory();
+ int size = cachePtr.getLength() + txState.length;
+ if (size > available) {
+ evictInactiveEntries(size - available + EVICTION_MARGIN_BYTES);
+ }
+ MemoryChunk chunk = this.getMemoryManager().allocate(size);
boolean success = false;
try {
- Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion);
- getServerCaches().put(cacheId, element);
+ CacheEntry entry;
+ synchronized(this) {
+ entry = getIfPresent(cacheId);
+ if (entry == null) {
+ entry = new CacheEntry(
+ cacheId, cachePtr, cacheFactory, txState, chunk,
+ usePersistentCache, useProtoForIndexMaintainer,
+ clientVersion);
+ getServerCaches().put(cacheId, entry);
+ if (usePersistentCache) {
+ getPersistentServerCaches().put(cacheId, entry);
+ }
+ }
+ entry.incrementLiveQueryCount();
+ }
success = true;
- return element;
+ return entry;
} finally {
if (!success) {
Closeables.closeAllQuietly(Collections.singletonList(chunk));
}
- }
+ }
}
-
+
@Override
- public void removeServerCache(ImmutableBytesPtr cacheId) {
- getServerCaches().invalidate(cacheId);
+ synchronized public void removeServerCache(ImmutableBytesPtr cacheId) {
+ CacheEntry entry = getServerCaches().getIfPresent(cacheId);
+ if (entry == null) {
+ return;
+ }
+ entry.decrementLiveQueryCount();
+ if (!entry.isLive()) {
+ logger.debug("Cache ID " + Bytes.toLong(cacheId.get()) + " is no longer live, invalidate it");
+ getServerCaches().invalidate(cacheId);
+ }
}
@Override
public void removeAllServerCache() {
getServerCaches().invalidateAll();
+ getPersistentServerCaches().invalidateAll();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3e5f5ee..603da0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -100,6 +100,7 @@ public class QueryCompiler {
private final SequenceManager sequenceManager;
private final boolean projectTuples;
private final boolean noChildParentJoinOptimization;
+ private final boolean usePersistentCache;
private final boolean optimizeSubquery;
private final Map<TableRef, QueryPlan> dataPlans;
private final boolean costBased;
@@ -117,7 +118,8 @@ public class QueryCompiler {
this.parallelIteratorFactory = parallelIteratorFactory;
this.sequenceManager = sequenceManager;
this.projectTuples = projectTuples;
- this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION);
+ this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION) || select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
+ this.usePersistentCache = select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
ConnectionQueryServices services = statement.getConnection().getQueryServices();
this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
scan.setLoadColumnFamiliesOnDemand(true);
@@ -314,7 +316,7 @@ public class QueryCompiler {
if (i < count - 1) {
fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
}
- hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+ hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), usePersistentCache, keyRangeLhsExpression, keyRangeRhsExpression);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
@@ -381,9 +383,10 @@ public class QueryCompiler {
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
+ boolean usePersistentCache = joinTable.getStatement().getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
+ return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, usePersistentCache, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
}
case SORT_MERGE: {
JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index eb195c2..cc38870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -74,6 +74,7 @@ public class StatementContext {
private final ImmutableBytesWritable tempPtr;
private final PhoenixStatement statement;
private final Map<PColumn, Integer> dataColumns;
+ private Map<Long, Boolean> retryingPersistentCache;
private long currentTime = QueryConstants.UNSET_TIMESTAMP;
private ScanRanges scanRanges = ScanRanges.EVERYTHING;
@@ -138,6 +139,7 @@ public class StatementContext {
this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled,connection.getLogLevel());
this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
+ this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
}
/**
@@ -326,5 +328,22 @@ public class StatementContext {
public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) {
this.isClientSideUpsertSelect = isClientSideUpsertSelect;
}
-
+
+ /*
+ * setRetryingPersistentCache can be used to override the USE_PERSISTENT_CACHE hint and disable the use of the
+ * persistent cache for a specific cache ID. This can be used to retry queries that failed when using the persistent
+ * cache.
+ */
+ public void setRetryingPersistentCache(long cacheId) {
+ retryingPersistentCache.put(cacheId, true);
+ }
+
+ public boolean getRetryingPersistentCache(long cacheId) {
+ Boolean retrying = retryingPersistentCache.get(cacheId);
+ if (retrying == null) {
+ return false;
+ } else {
+ return retrying;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index d82aaba..96af154 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -51,7 +51,6 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TupleUtil;
@@ -122,8 +121,7 @@ public class HashJoinRegionScanner implements RegionScanner {
}
HashCache hashCache = (HashCache)cache.getServerCache(joinId);
if (hashCache == null) {
- Exception cause = new HashJoinCacheNotFoundException(
- Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
+ Exception cause = new HashJoinCacheNotFoundException(Bytes.toLong(joinId.get()));
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 9d78659..86219c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -75,7 +75,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(),
- request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
+ request.getUsePersistentCache(), request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
} catch (Throwable e) {
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException("Error when adding cache: ", e));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index fdca334..c42b9df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -5760,6 +5760,16 @@ public final class ServerCachingProtos {
* <code>optional int32 clientVersion = 7;</code>
*/
int getClientVersion();
+
+ // optional bool usePersistentCache = 8;
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ boolean hasUsePersistentCache();
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ boolean getUsePersistentCache();
}
/**
* Protobuf type {@code AddServerCacheRequest}
@@ -5863,6 +5873,11 @@ public final class ServerCachingProtos {
clientVersion_ = input.readInt32();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ usePersistentCache_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6027,6 +6042,22 @@ public final class ServerCachingProtos {
return clientVersion_;
}
+ // optional bool usePersistentCache = 8;
+ public static final int USEPERSISTENTCACHE_FIELD_NUMBER = 8;
+ private boolean usePersistentCache_;
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public boolean hasUsePersistentCache() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public boolean getUsePersistentCache() {
+ return usePersistentCache_;
+ }
+
private void initFields() {
tenantId_ = com.google.protobuf.ByteString.EMPTY;
cacheId_ = com.google.protobuf.ByteString.EMPTY;
@@ -6035,6 +6066,7 @@ public final class ServerCachingProtos {
txState_ = com.google.protobuf.ByteString.EMPTY;
hasProtoBufIndexMaintainer_ = false;
clientVersion_ = 0;
+ usePersistentCache_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6089,6 +6121,9 @@ public final class ServerCachingProtos {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeInt32(7, clientVersion_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeBool(8, usePersistentCache_);
+ }
getUnknownFields().writeTo(output);
}
@@ -6126,6 +6161,10 @@ public final class ServerCachingProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(7, clientVersion_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(8, usePersistentCache_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -6184,6 +6223,11 @@ public final class ServerCachingProtos {
result = result && (getClientVersion()
== other.getClientVersion());
}
+ result = result && (hasUsePersistentCache() == other.hasUsePersistentCache());
+ if (hasUsePersistentCache()) {
+ result = result && (getUsePersistentCache()
+ == other.getUsePersistentCache());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -6225,6 +6269,10 @@ public final class ServerCachingProtos {
hash = (37 * hash) + CLIENTVERSION_FIELD_NUMBER;
hash = (53 * hash) + getClientVersion();
}
+ if (hasUsePersistentCache()) {
+ hash = (37 * hash) + USEPERSISTENTCACHE_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getUsePersistentCache());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -6358,6 +6406,8 @@ public final class ServerCachingProtos {
bitField0_ = (bitField0_ & ~0x00000020);
clientVersion_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ usePersistentCache_ = false;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -6422,6 +6472,10 @@ public final class ServerCachingProtos {
to_bitField0_ |= 0x00000040;
}
result.clientVersion_ = clientVersion_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.usePersistentCache_ = usePersistentCache_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -6459,6 +6513,9 @@ public final class ServerCachingProtos {
if (other.hasClientVersion()) {
setClientVersion(other.getClientVersion());
}
+ if (other.hasUsePersistentCache()) {
+ setUsePersistentCache(other.getUsePersistentCache());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6914,6 +6971,39 @@ public final class ServerCachingProtos {
return this;
}
+ // optional bool usePersistentCache = 8;
+ private boolean usePersistentCache_ ;
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public boolean hasUsePersistentCache() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public boolean getUsePersistentCache() {
+ return usePersistentCache_;
+ }
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public Builder setUsePersistentCache(boolean value) {
+ bitField0_ |= 0x00000080;
+ usePersistentCache_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool usePersistentCache = 8;</code>
+ */
+ public Builder clearUsePersistentCache() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ usePersistentCache_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
}
@@ -8723,22 +8813,23 @@ public final class ServerCachingProtos {
"\timmutable\030\022 \002(\010\022&\n\021indexedColumnInfo\030\023 " +
"\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" +
"\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\025\n\rview" +
- "IndexType\030\026 \001(\005\"\334\001\n\025AddServerCacheReques" +
+ "IndexType\030\026 \001(\005\"\370\001\n\025AddServerCacheReques" +
"t\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010" +
"cachePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022" +
")\n\014cacheFactory\030\004 \002(\0132\023.ServerCacheFacto" +
"ry\022\017\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBufIndexM" +
- "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\"(",
- "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" +
- "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" +
- "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC" +
- "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" +
- "achingService\022A\n\016addServerCache\022\026.AddSer" +
- "verCacheRequest\032\027.AddServerCacheResponse" +
- "\022J\n\021removeServerCache\022\031.RemoveServerCach" +
- "eRequest\032\032.RemoveServerCacheResponseBG\n(" +
- "org.apache.phoenix.coprocessor.generated" +
- "B\023ServerCachingProtosH\001\210\001\001\240\001\001"
+ "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\022\032",
+ "\n\022usePersistentCache\030\010 \001(\010\"(\n\026AddServerC" +
+ "acheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveSe" +
+ "rverCacheRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007ca" +
+ "cheId\030\002 \002(\014\"+\n\031RemoveServerCacheResponse" +
+ "\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCachingServic" +
+ "e\022A\n\016addServerCache\022\026.AddServerCacheRequ" +
+ "est\032\027.AddServerCacheResponse\022J\n\021removeSe" +
+ "rverCache\022\031.RemoveServerCacheRequest\032\032.R" +
+ "emoveServerCacheResponseBG\n(org.apache.p" +
+ "hoenix.coprocessor.generatedB\023ServerCach",
+ "ingProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8774,7 +8865,7 @@ public final class ServerCachingProtos {
internal_static_AddServerCacheRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_AddServerCacheRequest_descriptor,
- new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", });
+ new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", "UsePersistentCache", });
internal_static_AddServerCacheResponse_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_AddServerCacheResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index bfe089d..b5cd6b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -22,21 +22,23 @@ import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import static org.apache.phoenix.util.NumberUtil.add;
import static org.apache.phoenix.util.NumberUtil.getMin;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExplainPlan;
@@ -46,6 +48,7 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
@@ -57,9 +60,7 @@ import org.apache.phoenix.expression.InListExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.FilterResultIterator;
-import org.apache.phoenix.iterate.ParallelScanGrouper;
-import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.*;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
@@ -86,9 +87,11 @@ import org.apache.phoenix.util.SQLCloseables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.phoenix.util.ServerUtil;
public class HashJoinPlan extends DelegateQueryPlan {
private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
+ private static final Random RANDOM = new Random();
private final SelectStatement statement;
private final HashJoinInfo joinInfo;
@@ -105,6 +108,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
private Long estimatedBytes;
private Long estimateInfoTs;
private boolean getEstimatesCalled;
+ private boolean hasSubPlansWithPersistentCache;
public static HashJoinPlan create(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
@@ -134,8 +138,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
this.recompileWhereClause = recompileWhereClause;
this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size());
this.tableRefs.addAll(plan.getSourceRefs());
+ this.hasSubPlansWithPersistentCache = false;
for (SubPlan subPlan : subPlans) {
tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
+ if (subPlan instanceof HashSubPlan && ((HashSubPlan)subPlan).usePersistentCache) {
+ this.hasSubPlansWithPersistentCache = true;
+ }
}
QueryServices services = plan.getContext().getConnection().getQueryServices();
this.maxServerCacheTimeToLive = services.getProps().getInt(
@@ -214,7 +222,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
SQLCloseables.closeAllQuietly(dependencies.values());
throw firstException;
}
-
+
Expression postFilter = null;
boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
if (recompileWhereClause || hasKeyRangeExpressions) {
@@ -241,8 +249,35 @@ public class HashJoinPlan extends DelegateQueryPlan {
if (statement.getInnerSelectStatement() != null && postFilter != null) {
iterator = new FilterResultIterator(iterator, postFilter);
}
-
- return iterator;
+
+ if (hasSubPlansWithPersistentCache) {
+ return peekForPersistentCache(iterator, scanGrouper, scan);
+ } else {
+ return iterator;
+ }
+ }
+
+ private ResultIterator peekForPersistentCache(ResultIterator iterator, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ // The persistent subquery is optimistic and assumes caches are present on region
+ // servers. We verify that this is the case by peeking at one result. If there is
+ // a cache missing exception, we retry the query with the persistent cache disabled
+ // for that specific cache ID.
+ PeekingResultIterator peeking = LookAheadResultIterator.wrap(iterator);
+ try {
+ peeking.peek();
+ } catch (Exception e) {
+ try {
+ throw ServerUtil.parseServerException(e);
+ } catch (HashJoinCacheNotFoundException e2) {
+ Long cacheId = e2.getCacheId();
+ if (delegate.getContext().getRetryingPersistentCache(cacheId)) {
+ throw e2;
+ }
+ delegate.getContext().setRetryingPersistentCache(cacheId);
+ return iterator(scanGrouper, scan);
+ }
+ }
+ return peeking;
}
private Expression createKeyRangeExpression(Expression lhsExpression,
@@ -467,20 +502,29 @@ public class HashJoinPlan extends DelegateQueryPlan {
private final QueryPlan plan;
private final List<Expression> hashExpressions;
private final boolean singleValueOnly;
+ private final boolean usePersistentCache;
private final Expression keyRangeLhsExpression;
private final Expression keyRangeRhsExpression;
+ private final MessageDigest digest;
public HashSubPlan(int index, QueryPlan subPlan,
List<Expression> hashExpressions,
boolean singleValueOnly,
+ boolean usePersistentCache,
Expression keyRangeLhsExpression,
Expression keyRangeRhsExpression) {
this.index = index;
this.plan = subPlan;
this.hashExpressions = hashExpressions;
this.singleValueOnly = singleValueOnly;
+ this.usePersistentCache = usePersistentCache;
this.keyRangeLhsExpression = keyRangeLhsExpression;
this.keyRangeRhsExpression = keyRangeRhsExpression;
+ try {
+ this.digest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -494,19 +538,37 @@ public class HashJoinPlan extends DelegateQueryPlan {
if (hashExpressions != null) {
ResultIterator iterator = plan.iterator();
try {
- cache =
- parent.hashClient.addHashCache(ranges, iterator,
- plan.getEstimatedSize(), hashExpressions, singleValueOnly,
+ final byte[] cacheId;
+ String queryString = plan.getStatement().toString().replaceAll("\\$[0-9]+", "\\$");
+ if (usePersistentCache) {
+ cacheId = Arrays.copyOfRange(digest.digest(queryString.getBytes()), 0, 8);
+ boolean retrying = parent.delegate.getContext().getRetryingPersistentCache(Bytes.toLong(cacheId));
+ if (!retrying) {
+ try {
+ cache = parent.hashClient.createServerCache(cacheId, parent.delegate);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ } else {
+ cacheId = Bytes.toBytes(RANDOM.nextLong());
+ }
+ LOG.debug("Using cache ID " + Hex.encodeHexString(cacheId) + " for " + queryString);
+ if (cache == null) {
+ LOG.debug("Making RPC to add cache " + Hex.encodeHexString(cacheId));
+ cache = parent.hashClient.addHashCache(ranges, cacheId, iterator,
+ plan.getEstimatedSize(), hashExpressions, singleValueOnly, usePersistentCache,
parent.delegate.getTableRef().getTable(), keyRangeRhsExpression,
keyRangeRhsValues);
- long endTime = System.currentTimeMillis();
- boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
- if (!isSet && (endTime
- - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
- LOG.warn(addCustomAnnotations(
- "Hash plan [" + index
- + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.",
- parent.delegate.getContext().getConnection()));
+ long endTime = System.currentTimeMillis();
+ boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
+ if (!isSet && (endTime
+ - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
+ LOG.warn(addCustomAnnotations(
+ "Hash plan [" + index
+ + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.",
+ parent.delegate.getContext().getConnection()));
+ }
}
} finally {
iterator.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index d890383..2378175 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -1309,8 +1309,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
throw e2;
}
Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId();
- if (!hashCacheClient.addHashCacheToServer(startKey,
- caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; }
+ ServerCache cache = caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+ if (cache .getCachePtr() != null) {
+ if (!hashCacheClient.addHashCacheToServer(startKey, cache, plan.getTableRef().getTable())) {
+ throw e2;
+ }
+ }
}
concatIterators =
recreateIterators(services, isLocalIndex, allIterators,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 06f612a..f1d1663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -194,13 +194,11 @@ public class TableResultIterator implements ResultIterator {
if (retry <= 0) {
throw e1;
}
+ Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
retry--;
try {
- Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
-
ServerCache cache = caches == null ? null :
- caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
-
+ caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
cache, plan.getTableRef().getTable())) {
throw e1;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 83ac32d..315c515 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -67,6 +68,20 @@ public class HashCacheClient {
}
/**
+ * Creates a ServerCache object for cacheId. This is used for persistent cache, and there may or may not
+ * be corresponding data on each region server.
+ * @param cacheId ID for the cache entry
+ * @param delegate the query plan this will be used for
+ * @return client-side {@link ServerCache} representing the hash cache that may or may not be present on region servers.
+ * @throws SQLException
+ * size
+ */
+ public ServerCache createServerCache(final byte[] cacheId, QueryPlan delegate)
+ throws SQLException, IOException {
+ return serverCache.createServerCache(cacheId, delegate);
+ }
+
+ /**
* Send the results of scanning through the scanner to all
* region servers for regions of the table that will use the cache
* that intersect with the minMaxKeyRange.
@@ -76,13 +91,16 @@ public class HashCacheClient {
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
+ public ServerCache addHashCache(
+ ScanRanges keyRanges, byte[] cacheId, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions,
+ boolean singleValueOnly, boolean usePersistentCache, PTable cacheUsingTable, Expression keyRangeRhsExpression,
+ List<Expression> keyRangeRhsValues) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
- ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, true);
+ ServerCache cache = serverCache.addServerCache(keyRanges, cacheId, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, usePersistentCache, true);
return cache;
}
@@ -90,7 +108,7 @@ public class HashCacheClient {
* Should only be used to resend the hash table cache to the regionserver.
*
* @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache
- * @param cacheId Id of the cache which needs to be sent
+ * @param cache The cache which needs to be sent
* @param pTable
* @return
* @throws Exception
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 4fc3c70..ecf9d57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import net.jcip.annotations.Immutable;
@@ -139,6 +140,16 @@ public class HashCacheFactory implements ServerCacheFactory {
}
@Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ Set<ImmutableBytesPtr> keySet = hashCache.keySet();
+ for (ImmutableBytesPtr key : keySet) {
+ sb.append("key: " + key + " value: " + hashCache.get(key));
+ }
+ return sb.toString();
+ }
+
+ @Override
public void close() {
memoryChunk.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index 02a44ad..8a83116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -83,6 +83,10 @@ public class HintNode {
*/
USE_SORT_MERGE_JOIN,
/**
+ * Persist the RHS results of a hash join.
+ */
+ USE_PERSISTENT_CACHE,
+ /**
* Avoid using star-join optimization. Used for broadcast join (hash join) only.
*/
NO_STAR_JOIN,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d681a13..d1b277a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -90,6 +90,7 @@ public interface QueryServices extends SQLCloseable {
public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
+ public static final String MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCachePersistenceTimeToLiveMs";
@Deprecated // Use FORCE_ROW_KEY_ORDER instead.
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 76e79fa..35dbe3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -159,6 +159,7 @@ public class QueryServicesOptions {
public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152;
// The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be.
public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity)
+ public static final int DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS = 30 * 60000; // 30 minutes
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
index c741f4e..3c8a269 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -17,9 +17,6 @@
*/
package org.apache.phoenix.cache;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
@@ -38,49 +35,53 @@ import org.junit.Test;
import com.google.common.base.Ticker;
+import static org.junit.Assert.*;
+
public class TenantCacheTest {
@Test
public void testInvalidateClosesMemoryChunk() throws SQLException {
int maxServerCacheTimeToLive = 10000;
+ int maxServerCachePersistenceTimeToLive = 10;
long maxBytes = 1000;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
- TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive);
- ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
+ TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive);
+ ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes(1L));
ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
- newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+ newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
newTenantCache.removeServerCache(cacheId);
assertEquals(maxBytes, memoryManager.getAvailableMemory());
}
-
+
@Test
public void testTimeoutClosesMemoryChunk() throws Exception {
int maxServerCacheTimeToLive = 10;
+ int maxServerCachePersistenceTimeToLive = 10;
long maxBytes = 1000;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
ManualTicker ticker = new ManualTicker();
- TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
- ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
- cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
cache.cleanUp();
assertEquals(maxBytes, memoryManager.getAvailableMemory());
}
-
@Test
public void testFreeMemoryOnAccess() throws Exception {
int maxServerCacheTimeToLive = 10;
+ int maxServerCachePersistenceTimeToLive = 10;
long maxBytes = 1000;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
ManualTicker ticker = new ManualTicker();
- TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
- ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
- cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
assertNull(cache.getServerCache(cacheId1));
@@ -90,17 +91,92 @@ public class TenantCacheTest {
@Test
public void testExpiredCacheOnAddingNew() throws Exception {
int maxServerCacheTimeToLive = 10;
+ int maxServerCachePersistenceTimeToLive = 10;
long maxBytes = 10;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
ManualTicker ticker = new ManualTicker();
- TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
- ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12345678"));
- cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
assertEquals(2, memoryManager.getAvailableMemory());
ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
- cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION);
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION);
+ assertEquals(2, memoryManager.getAvailableMemory());
+ }
+
+ @Test
+ public void testExpiresButStaysInPersistentAfterTimeout() throws Exception {
+ int maxServerCacheTimeToLive = 100;
+ int maxServerCachePersistenceTimeToLive = 1000;
+ long maxBytes = 1000;
+ GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+ ManualTicker ticker = new ManualTicker();
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+ ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+ assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+ assertNotNull(cache.getServerCache(cacheId1));
+
+ // Expire it from live cache but not persistent cache
+ ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
+ cache.cleanUp();
+ assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+ assertNotNull(cache.getServerCache(cacheId1));
+
+ // Expire it from persistent cache as well
+ ticker.time += (maxServerCachePersistenceTimeToLive + 1) * 1000000;
+ cache.cleanUp();
+ assertEquals(maxBytes, memoryManager.getAvailableMemory());
+ assertNull(cache.getServerCache(cacheId1));
+ }
+
+ @Test
+ public void testExpiresButStaysInPersistentAfterRemove() throws Exception {
+ int maxServerCacheTimeToLive = 100;
+ int maxServerCachePersistenceTimeToLive = 1000;
+ long maxBytes = 1000;
+ GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+ ManualTicker ticker = new ManualTicker();
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+ ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12"));
+ cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+ assertEquals(maxBytes-2, memoryManager.getAvailableMemory());
+ assertNotNull(cache.getServerCache(cacheId1));
+
+ // Remove should only remove from live cache
+ cache.removeServerCache(cacheId1);
+ assertEquals(maxBytes-2, memoryManager.getAvailableMemory());
+ assertNotNull(cache.getServerCache(cacheId1));
+ }
+
+ @Test
+ public void testEvictPersistentCacheIfSpaceIsNeeded() throws Exception {
+ int maxServerCacheTimeToLive = 100;
+ int maxServerCachePersistenceTimeToLive = 1000;
+ long maxBytes = 10;
+ GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
+ ManualTicker ticker = new ManualTicker();
+ TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker);
+ ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L));
+ ImmutableBytesWritable cachePtr1 = new ImmutableBytesWritable(Bytes.toBytes("1234"));
+ cache.addServerCache(cacheId1, cachePtr1, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
+ assertEquals(6, memoryManager.getAvailableMemory());
+
+ // Remove it, but it should stay in persistent cache
+ cache.removeServerCache(cacheId1);
+ assertNotNull(cache.getServerCache(cacheId1));
+ assertEquals(6, memoryManager.getAvailableMemory());
+
+ // Let's do an entry that will require eviction
+ ImmutableBytesPtr cacheId2 = new ImmutableBytesPtr(Bytes.toBytes(2L));
+ ImmutableBytesWritable cachePtr2 = new ImmutableBytesWritable(Bytes.toBytes("12345678"));
+ cache.addServerCache(cacheId2, cachePtr2, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION);
assertEquals(2, memoryManager.getAvailableMemory());
+ assertNull(cache.getServerCache(cacheId1));
+ assertNotNull(cache.getServerCache(cacheId2));
}
public static class ManualTicker extends Ticker {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e0405b7/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index d92f2cd..0d2d1d2 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -73,6 +73,7 @@ message AddServerCacheRequest {
optional bytes txState = 5;
optional bool hasProtoBufIndexMaintainer = 6;
optional int32 clientVersion = 7;
+ optional bool usePersistentCache = 8;
}
message AddServerCacheResponse {