You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2017/09/27 19:16:49 UTC
[1/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for
HashJoin doesn't work when cache has expired on server side
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 ca664fe10 -> 51fae875a
refs/heads/4.x-HBase-1.1 4e51fe772 -> 8a7ba9dde
refs/heads/4.x-HBase-1.2 d714afcec -> 53016519d
refs/heads/master 764eb8f13 -> 033a2fc2a
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/51fae875
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/51fae875
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/51fae875
Branch: refs/heads/4.x-HBase-0.98
Commit: 51fae875a97e143a923671314ee11205be6223cf
Parents: ca664fe
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:07:56 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
.../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/51fae875/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 3f60d9b..f6ca749 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
@RunWith(Parameterized.class)
public class HashJoinCacheIT extends HashJoinIT {
@@ -427,7 +437,27 @@ public class HashJoinCacheIT extends HashJoinIT {
public void testUpsertWithJoin() throws Exception {
// TODO: We will enable this test once PHOENIX-3163
}
-
+
+ @Test
+ public void testExpiredCache() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+ String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+ String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+ tableName1 + " supp RIGHT JOIN " + tableName2 +
+ " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ rs.next();
+ fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+ } catch (HashJoinCacheNotFoundException e) {
+ //Expected exception
+ }
+ }
+
public static class InvalidateHashCache extends SimpleRegionObserver {
public static Random rand= new Random();
public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/51fae875/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
public class ServerCache implements SQLCloseable {
private final int size;
private final byte[] id;
- private final Set<HRegionLocation> servers;
+ private final Map<HRegionLocation, Long> servers;
private ImmutableBytesWritable cachePtr;
private MemoryChunk chunk;
private File outputFile;
+ private long maxServerCacheTTL;
public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+ maxServerCacheTTL = services.getProps().getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.id = id;
- this.servers = new HashSet<HRegionLocation>(servers);
+ this.servers = new HashMap();
+ long currentTime = System.currentTimeMillis();
+ for(HRegionLocation loc : servers) {
+ this.servers.put(loc, currentTime);
+ }
this.size = cachePtr.getLength();
if (storeCacheOnClient) {
try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
public byte[] getId() {
return id;
}
-
- public boolean addServer(HRegionLocation loc) {
- return this.servers.add(loc);
- }
+
+ public boolean addServer(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ return false;
+ } else {
+ this.servers.put(loc, System.currentTimeMillis());
+ return true;
+ }
+ }
+
+ public boolean isExpired(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ Long time = this.servers.get(loc);
+ if(System.currentTimeMillis() - time > maxServerCacheTTL)
+ return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+ } else {
+ return false; // should be on server yet.
+ }
+ return false; // Unknown region location. Need to send the cache.
+ }
+
+
/**
* Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
@Override
public void close() throws SQLException {
try{
- removeServerCache(this, servers);
+ removeServerCache(this, servers.keySet());
}finally{
cachePtr = null;
if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
/**
* Remove the cached table from all region servers
- * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
- * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
@@ -421,6 +446,9 @@ public class ServerCacheClient {
byte[] tableName = pTable.getPhysicalName().getBytes();
table = services.getTable(tableName);
HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+ if(cache.isExpired(tableRegionLocation)) {
+ return false;
+ }
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
txState);
[3/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for
HashJoin doesn't work when cache has expired on server side
Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a7ba9dd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a7ba9dd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a7ba9dd
Branch: refs/heads/4.x-HBase-1.1
Commit: 8a7ba9ddef5b2c3fea7fc5d57d4185b11c3267e4
Parents: 4e51fe7
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:17 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
.../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a7ba9dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
@RunWith(Parameterized.class)
public class HashJoinCacheIT extends HashJoinIT {
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
public void testUpsertWithJoin() throws Exception {
// TODO: We will enable this test once PHOENIX-3163
}
-
+
+ @Test
+ public void testExpiredCache() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+ String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+ String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+ tableName1 + " supp RIGHT JOIN " + tableName2 +
+ " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ rs.next();
+ fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+ } catch (HashJoinCacheNotFoundException e) {
+ //Expected exception
+ }
+ }
+
public static class InvalidateHashCache extends SimpleRegionObserver {
public static Random rand= new Random();
public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a7ba9dd/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
public class ServerCache implements SQLCloseable {
private final int size;
private final byte[] id;
- private final Set<HRegionLocation> servers;
+ private final Map<HRegionLocation, Long> servers;
private ImmutableBytesWritable cachePtr;
private MemoryChunk chunk;
private File outputFile;
+ private long maxServerCacheTTL;
public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+ maxServerCacheTTL = services.getProps().getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.id = id;
- this.servers = new HashSet<HRegionLocation>(servers);
+ this.servers = new HashMap();
+ long currentTime = System.currentTimeMillis();
+ for(HRegionLocation loc : servers) {
+ this.servers.put(loc, currentTime);
+ }
this.size = cachePtr.getLength();
if (storeCacheOnClient) {
try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
public byte[] getId() {
return id;
}
-
- public boolean addServer(HRegionLocation loc) {
- return this.servers.add(loc);
- }
+
+ public boolean addServer(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ return false;
+ } else {
+ this.servers.put(loc, System.currentTimeMillis());
+ return true;
+ }
+ }
+
+ public boolean isExpired(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ Long time = this.servers.get(loc);
+ if(System.currentTimeMillis() - time > maxServerCacheTTL)
+ return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+ } else {
+ return false; // should be on server yet.
+ }
+ return false; // Unknown region location. Need to send the cache.
+ }
+
+
/**
* Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
@Override
public void close() throws SQLException {
try{
- removeServerCache(this, servers);
+ removeServerCache(this, servers.keySet());
}finally{
cachePtr = null;
if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
/**
* Remove the cached table from all region servers
- * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
- * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
@@ -421,6 +446,9 @@ public class ServerCacheClient {
byte[] tableName = pTable.getPhysicalName().getBytes();
table = services.getTable(tableName);
HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+ if(cache.isExpired(tableRegionLocation)) {
+ return false;
+ }
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
txState);
[2/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for
HashJoin doesn't work when cache has expired on server side
Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53016519
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53016519
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53016519
Branch: refs/heads/4.x-HBase-1.2
Commit: 53016519df73606f49433470768c5037b69ea185
Parents: d714afc
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:08 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
.../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
@RunWith(Parameterized.class)
public class HashJoinCacheIT extends HashJoinIT {
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
public void testUpsertWithJoin() throws Exception {
// TODO: We will enable this test once PHOENIX-3163
}
-
+
+ @Test
+ public void testExpiredCache() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+ String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+ String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+ tableName1 + " supp RIGHT JOIN " + tableName2 +
+ " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ rs.next();
+ fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+ } catch (HashJoinCacheNotFoundException e) {
+ //Expected exception
+ }
+ }
+
public static class InvalidateHashCache extends SimpleRegionObserver {
public static Random rand= new Random();
public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
public class ServerCache implements SQLCloseable {
private final int size;
private final byte[] id;
- private final Set<HRegionLocation> servers;
+ private final Map<HRegionLocation, Long> servers;
private ImmutableBytesWritable cachePtr;
private MemoryChunk chunk;
private File outputFile;
+ private long maxServerCacheTTL;
public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+ maxServerCacheTTL = services.getProps().getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.id = id;
- this.servers = new HashSet<HRegionLocation>(servers);
+ this.servers = new HashMap();
+ long currentTime = System.currentTimeMillis();
+ for(HRegionLocation loc : servers) {
+ this.servers.put(loc, currentTime);
+ }
this.size = cachePtr.getLength();
if (storeCacheOnClient) {
try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
public byte[] getId() {
return id;
}
-
- public boolean addServer(HRegionLocation loc) {
- return this.servers.add(loc);
- }
+
+ public boolean addServer(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ return false;
+ } else {
+ this.servers.put(loc, System.currentTimeMillis());
+ return true;
+ }
+ }
+
+ public boolean isExpired(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ Long time = this.servers.get(loc);
+ if(System.currentTimeMillis() - time > maxServerCacheTTL)
+ return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+ } else {
+ return false; // should be on server yet.
+ }
+ return false; // Unknown region location. Need to send the cache.
+ }
+
+
/**
* Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
@Override
public void close() throws SQLException {
try{
- removeServerCache(this, servers);
+ removeServerCache(this, servers.keySet());
}finally{
cachePtr = null;
if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
/**
* Remove the cached table from all region servers
- * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
- * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
@@ -421,6 +446,9 @@ public class ServerCacheClient {
byte[] tableName = pTable.getPhysicalName().getBytes();
table = services.getTable(tableName);
HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+ if(cache.isExpired(tableRegionLocation)) {
+ return false;
+ }
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
txState);
[4/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for
HashJoin doesn't work when cache has expired on server side
Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/033a2fc2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/033a2fc2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/033a2fc2
Branch: refs/heads/master
Commit: 033a2fc2a91052a6db94da55d87c173d4dbdabab
Parents: 764eb8f
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:25 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
.../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
@RunWith(Parameterized.class)
public class HashJoinCacheIT extends HashJoinIT {
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
public void testUpsertWithJoin() throws Exception {
// TODO: We will enable this test once PHOENIX-3163
}
-
+
+ @Test
+ public void testExpiredCache() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+ String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+ String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+ tableName1 + " supp RIGHT JOIN " + tableName2 +
+ " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ rs.next();
+ fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+ } catch (HashJoinCacheNotFoundException e) {
+ //Expected exception
+ }
+ }
+
public static class InvalidateHashCache extends SimpleRegionObserver {
public static Random rand= new Random();
public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
public class ServerCache implements SQLCloseable {
private final int size;
private final byte[] id;
- private final Set<HRegionLocation> servers;
+ private final Map<HRegionLocation, Long> servers;
private ImmutableBytesWritable cachePtr;
private MemoryChunk chunk;
private File outputFile;
+ private long maxServerCacheTTL;
public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+ maxServerCacheTTL = services.getProps().getInt(
+ QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.id = id;
- this.servers = new HashSet<HRegionLocation>(servers);
+ this.servers = new HashMap();
+ long currentTime = System.currentTimeMillis();
+ for(HRegionLocation loc : servers) {
+ this.servers.put(loc, currentTime);
+ }
this.size = cachePtr.getLength();
if (storeCacheOnClient) {
try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
public byte[] getId() {
return id;
}
-
- public boolean addServer(HRegionLocation loc) {
- return this.servers.add(loc);
- }
+
+ public boolean addServer(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ return false;
+ } else {
+ this.servers.put(loc, System.currentTimeMillis());
+ return true;
+ }
+ }
+
+ public boolean isExpired(HRegionLocation loc) {
+ if(this.servers.containsKey(loc)) {
+ Long time = this.servers.get(loc);
+ if(System.currentTimeMillis() - time > maxServerCacheTTL)
+ return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+ } else {
+ return false; // should be on server yet.
+ }
+ return false; // Unknown region location. Need to send the cache.
+ }
+
+
/**
* Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
@Override
public void close() throws SQLException {
try{
- removeServerCache(this, servers);
+ removeServerCache(this, servers.keySet());
}finally{
cachePtr = null;
if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
/**
* Remove the cached table from all region servers
- * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
- * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
@@ -421,6 +446,9 @@ public class ServerCacheClient {
byte[] tableName = pTable.getPhysicalName().getBytes();
table = services.getTable(tableName);
HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+ if(cache.isExpired(tableRegionLocation)) {
+ return false;
+ }
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
txState);