You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2017/09/27 19:16:49 UTC

[1/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 ca664fe10 -> 51fae875a
  refs/heads/4.x-HBase-1.1 4e51fe772 -> 8a7ba9dde
  refs/heads/4.x-HBase-1.2 d714afcec -> 53016519d
  refs/heads/master 764eb8f13 -> 033a2fc2a


PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/51fae875
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/51fae875
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/51fae875

Branch: refs/heads/4.x-HBase-0.98
Commit: 51fae875a97e143a923671314ee11205be6223cf
Parents: ca664fe
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:07:56 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
 .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/51fae875/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 3f60d9b..f6ca749 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
 @RunWith(Parameterized.class)
 public class HashJoinCacheIT extends HashJoinIT {
     
@@ -427,7 +437,27 @@ public class HashJoinCacheIT extends HashJoinIT {
 	public void testUpsertWithJoin() throws Exception {
 		// TODO: We will enable this test once PHOENIX-3163
 	}
-    
+
+    @Test
+    public void testExpiredCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+        String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+        String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+                tableName1 + " supp RIGHT JOIN " + tableName2 +
+                " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            rs.next();
+            fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+        } catch (HashJoinCacheNotFoundException e) {
+            //Expected exception
+        }
+    }
+
     public static class InvalidateHashCache extends SimpleRegionObserver {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/51fae875/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final Set<HRegionLocation> servers;
+        private final Map<HRegionLocation, Long> servers;
         private ImmutableBytesWritable cachePtr;
         private MemoryChunk chunk;
         private File outputFile;
+        private long maxServerCacheTTL;
         
         
         public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
                 ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+            maxServerCacheTTL = services.getProps().getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             this.id = id;
-            this.servers = new HashSet<HRegionLocation>(servers);
+            this.servers = new HashMap();
+            long currentTime = System.currentTimeMillis();
+            for(HRegionLocation loc : servers) {
+                this.servers.put(loc, currentTime);
+            }
             this.size =  cachePtr.getLength();
             if (storeCacheOnClient) {
                 try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-        
-		public boolean addServer(HRegionLocation loc) {
-			return this.servers.add(loc);
-		}
+
+        public boolean addServer(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                return false;
+            } else {
+                this.servers.put(loc, System.currentTimeMillis());
+                return true;
+            }
+        }
+
+        public boolean isExpired(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                Long time = this.servers.get(loc);
+                if(System.currentTimeMillis() - time > maxServerCacheTTL)
+                    return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+            } else {
+                return false; // should be on server yet.
+            }
+            return false; // Unknown region location. Need to send the cache.
+        }
+
+
         
         /**
          * Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
         @Override
         public void close() throws SQLException {
             try{
-                removeServerCache(this, servers);
+                removeServerCache(this, servers.keySet());
             }finally{
                 cachePtr = null;
                 if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
     
     /**
      * Remove the cached table from all region servers
-     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
-     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
      */
@@ -421,6 +446,9 @@ public class ServerCacheClient {
             byte[] tableName = pTable.getPhysicalName().getBytes();
             table = services.getTable(tableName);
             HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+            if(cache.isExpired(tableRegionLocation)) {
+                return false;
+            }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
 						txState);


[3/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side

Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a7ba9dd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a7ba9dd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a7ba9dd

Branch: refs/heads/4.x-HBase-1.1
Commit: 8a7ba9ddef5b2c3fea7fc5d57d4185b11c3267e4
Parents: 4e51fe7
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:17 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
 .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a7ba9dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
 @RunWith(Parameterized.class)
 public class HashJoinCacheIT extends HashJoinIT {
     
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
 	public void testUpsertWithJoin() throws Exception {
 		// TODO: We will enable this test once PHOENIX-3163
 	}
-    
+
+    @Test
+    public void testExpiredCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+        String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+        String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+                tableName1 + " supp RIGHT JOIN " + tableName2 +
+                " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            rs.next();
+            fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+        } catch (HashJoinCacheNotFoundException e) {
+            //Expected exception
+        }
+    }
+
     public static class InvalidateHashCache extends SimpleRegionObserver {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a7ba9dd/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final Set<HRegionLocation> servers;
+        private final Map<HRegionLocation, Long> servers;
         private ImmutableBytesWritable cachePtr;
         private MemoryChunk chunk;
         private File outputFile;
+        private long maxServerCacheTTL;
         
         
         public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
                 ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+            maxServerCacheTTL = services.getProps().getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             this.id = id;
-            this.servers = new HashSet<HRegionLocation>(servers);
+            this.servers = new HashMap();
+            long currentTime = System.currentTimeMillis();
+            for(HRegionLocation loc : servers) {
+                this.servers.put(loc, currentTime);
+            }
             this.size =  cachePtr.getLength();
             if (storeCacheOnClient) {
                 try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-        
-		public boolean addServer(HRegionLocation loc) {
-			return this.servers.add(loc);
-		}
+
+        public boolean addServer(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                return false;
+            } else {
+                this.servers.put(loc, System.currentTimeMillis());
+                return true;
+            }
+        }
+
+        public boolean isExpired(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                Long time = this.servers.get(loc);
+                if(System.currentTimeMillis() - time > maxServerCacheTTL)
+                    return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+            } else {
+                return false; // should be on server yet.
+            }
+            return false; // Unknown region location. Need to send the cache.
+        }
+
+
         
         /**
          * Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
         @Override
         public void close() throws SQLException {
             try{
-                removeServerCache(this, servers);
+                removeServerCache(this, servers.keySet());
             }finally{
                 cachePtr = null;
                 if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
     
     /**
      * Remove the cached table from all region servers
-     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
-     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
      */
@@ -421,6 +446,9 @@ public class ServerCacheClient {
             byte[] tableName = pTable.getPhysicalName().getBytes();
             table = services.getTable(tableName);
             HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+            if(cache.isExpired(tableRegionLocation)) {
+                return false;
+            }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
 						txState);


[2/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side

Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53016519
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53016519
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53016519

Branch: refs/heads/4.x-HBase-1.2
Commit: 53016519df73606f49433470768c5037b69ea185
Parents: d714afc
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:08 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
 .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
 @RunWith(Parameterized.class)
 public class HashJoinCacheIT extends HashJoinIT {
     
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
 	public void testUpsertWithJoin() throws Exception {
 		// TODO: We will enable this test once PHOENIX-3163
 	}
-    
+
+    @Test
+    public void testExpiredCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+        String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+        String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+                tableName1 + " supp RIGHT JOIN " + tableName2 +
+                " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            rs.next();
+            fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+        } catch (HashJoinCacheNotFoundException e) {
+            //Expected exception
+        }
+    }
+
     public static class InvalidateHashCache extends SimpleRegionObserver {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final Set<HRegionLocation> servers;
+        private final Map<HRegionLocation, Long> servers;
         private ImmutableBytesWritable cachePtr;
         private MemoryChunk chunk;
         private File outputFile;
+        private long maxServerCacheTTL;
         
         
         public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
                 ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+            maxServerCacheTTL = services.getProps().getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             this.id = id;
-            this.servers = new HashSet<HRegionLocation>(servers);
+            this.servers = new HashMap();
+            long currentTime = System.currentTimeMillis();
+            for(HRegionLocation loc : servers) {
+                this.servers.put(loc, currentTime);
+            }
             this.size =  cachePtr.getLength();
             if (storeCacheOnClient) {
                 try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-        
-		public boolean addServer(HRegionLocation loc) {
-			return this.servers.add(loc);
-		}
+
+        public boolean addServer(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                return false;
+            } else {
+                this.servers.put(loc, System.currentTimeMillis());
+                return true;
+            }
+        }
+
+        public boolean isExpired(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                Long time = this.servers.get(loc);
+                if(System.currentTimeMillis() - time > maxServerCacheTTL)
+                    return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+            } else {
+                return false; // should be on server yet.
+            }
+            return false; // Unknown region location. Need to send the cache.
+        }
+
+
         
         /**
          * Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
         @Override
         public void close() throws SQLException {
             try{
-                removeServerCache(this, servers);
+                removeServerCache(this, servers.keySet());
             }finally{
                 cachePtr = null;
                 if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
     
     /**
      * Remove the cached table from all region servers
-     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
-     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
      */
@@ -421,6 +446,9 @@ public class ServerCacheClient {
             byte[] tableName = pTable.getPhysicalName().getBytes();
             table = services.getTable(tableName);
             HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+            if(cache.isExpired(tableRegionLocation)) {
+                return false;
+            }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
 						txState);


[4/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side

Posted by ss...@apache.org.
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/033a2fc2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/033a2fc2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/033a2fc2

Branch: refs/heads/master
Commit: 033a2fc2a91052a6db94da55d87c173d4dbdabab
Parents: 764eb8f
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Sep 27 12:09:25 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
 .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
 @RunWith(Parameterized.class)
 public class HashJoinCacheIT extends HashJoinIT {
     
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
 	public void testUpsertWithJoin() throws Exception {
 		// TODO: We will enable this test once PHOENIX-3163
 	}
-    
+
+    @Test
+    public void testExpiredCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+        String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+        String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " +
+                tableName1 + " supp RIGHT JOIN " + tableName2 +
+                " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            rs.next();
+            fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+        } catch (HashJoinCacheNotFoundException e) {
+            //Expected exception
+        }
+    }
+
     public static class InvalidateHashCache extends SimpleRegionObserver {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final Set<HRegionLocation> servers;
+        private final Map<HRegionLocation, Long> servers;
         private ImmutableBytesWritable cachePtr;
         private MemoryChunk chunk;
         private File outputFile;
+        private long maxServerCacheTTL;
         
         
         public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
                 ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
+            maxServerCacheTTL = services.getProps().getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             this.id = id;
-            this.servers = new HashSet<HRegionLocation>(servers);
+            this.servers = new HashMap();
+            long currentTime = System.currentTimeMillis();
+            for(HRegionLocation loc : servers) {
+                this.servers.put(loc, currentTime);
+            }
             this.size =  cachePtr.getLength();
             if (storeCacheOnClient) {
                 try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-        
-		public boolean addServer(HRegionLocation loc) {
-			return this.servers.add(loc);
-		}
+
+        public boolean addServer(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                return false;
+            } else {
+                this.servers.put(loc, System.currentTimeMillis());
+                return true;
+            }
+        }
+
+        public boolean isExpired(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                Long time = this.servers.get(loc);
+                if(System.currentTimeMillis() - time > maxServerCacheTTL)
+                    return true; // cache was send more than maxTTL ms ago, expecting that it's expired
+            } else {
+                return false; // should be on server yet.
+            }
+            return false; // Unknown region location. Need to send the cache.
+        }
+
+
         
         /**
          * Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
         @Override
         public void close() throws SQLException {
             try{
-                removeServerCache(this, servers);
+                removeServerCache(this, servers.keySet());
             }finally{
                 cachePtr = null;
                 if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
     
     /**
      * Remove the cached table from all region servers
-     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
-     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
      */
@@ -421,6 +446,9 @@ public class ServerCacheClient {
             byte[] tableName = pTable.getPhysicalName().getBytes();
             table = services.getTable(tableName);
             HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+            if(cache.isExpired(tableRegionLocation)) {
+                return false;
+            }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
 						txState);