You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/12/14 20:31:26 UTC

[3/3] phoenix git commit: PHOENIX-2519 Prevent RPC for SYSTEM tables when querying

PHOENIX-2519 Prevent RPC for SYSTEM tables when querying


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/470b7dd8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/470b7dd8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/470b7dd8

Branch: refs/heads/master
Commit: 470b7dd8f8d709a175d8606251efb4e7be3a59f5
Parents: e8a7fee
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Dec 13 14:31:00 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Dec 14 11:31:15 2015 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/rpc/UpdateCacheIT.java   | 46 +++++++++++++++-----
 .../phoenix/rpc/UpdateCacheWithScnIT.java       |  2 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 12 ++++-
 3 files changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
index 3b3c6c7..208af94 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -78,26 +79,49 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
         ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
     }
 
+    private static void setupSystemTable(Long scn) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        if (scn != null) {
+            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(
+            "create table " + QueryConstants.SYSTEM_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA);
+        }
+    }
+    
     @Test
     public void testUpdateCacheForTxnTable() throws Exception {
-        helpTestUpdateCache(true, null);
+        helpTestUpdateCache(true, false, null);
     }
     
     @Test
     public void testUpdateCacheForNonTxnTable() throws Exception {
-        helpTestUpdateCache(false, null);
+        helpTestUpdateCache(false, false, null);
     }
 	
-	public static void helpTestUpdateCache(boolean isTransactional, Long scn) throws Exception {
+    @Test
+    public void testUpdateCacheForNonTxnSystemTable() throws Exception {
+        helpTestUpdateCache(false, true, null);
+    }
+    
+	public static void helpTestUpdateCache(boolean isTransactional, boolean isSystem, Long scn) throws Exception {
 	    String tableName = isTransactional ? TRANSACTIONAL_DATA_TABLE : MUTABLE_INDEX_DATA_TABLE;
-	    String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
+	    String schemaName;
+	    if (isSystem) {
+	        setupSystemTable(scn);
+	        schemaName = QueryConstants.SYSTEM_SCHEMA_NAME;
+	    } else {
+	        schemaName = INDEX_DATA_SCHEMA;
+	    }
+	    String fullTableName = schemaName + QueryConstants.NAME_SEPARATOR + tableName;
 		String selectSql = "SELECT * FROM "+fullTableName;
 		// use a spyed ConnectionQueryServices so we can verify calls to getTable
 		ConnectionQueryServices connectionQueryServices = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)));
 		Properties props = new Properties();
 		props.putAll(PhoenixEmbeddedDriver.DEFFAULT_PROPS.asMap());
 		if (scn!=null) {
-            props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+            props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+10));
         }
 		Connection conn = connectionQueryServices.connect(getUrl(), props);
 		try {
@@ -112,13 +136,14 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
 			TestUtil.setRowKeyColumns(stmt, 3);
 			stmt.execute();
 			conn.commit();
-			// verify only one rpc to fetch table metadata, 
-            verify(connectionQueryServices).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
+            int numUpsertRpcs = isSystem ? 0 : 1; 
+			// verify only 0 or 1 rpc to fetch table metadata, 
+            verify(connectionQueryServices, times(numUpsertRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
             reset(connectionQueryServices);
             
             if (scn!=null) {
                 // advance scn so that we can see the data we just upserted
-                props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+2));
+                props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+20));
                 conn = connectionQueryServices.connect(getUrl(), props);
             }
 			
@@ -143,8 +168,9 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
 	        // for non-transactional tables without a scn : verify one rpc to getTable occurs *per* query
             // for non-transactional tables with a scn : verify *only* one rpc occurs
             // for transactional tables : verify *only* one rpc occurs
-            int numRpcs = isTransactional || scn!=null ? 1 : 3; 
-            verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
+	        // for non-transactional, system tables : verify non rpc occurs
+            int numRpcs = isSystem ? 0 : (isTransactional || scn!=null ? 1 : 3); 
+            verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
 		}
         finally {
         	conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
index dbc7fd1..5ff2fb0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
@@ -35,7 +35,7 @@ public class UpdateCacheWithScnIT extends BaseClientManagedTimeIT {
 	
 	@Test
 	public void testUpdateCacheWithScn() throws Exception {
-		UpdateCacheIT.helpTestUpdateCache(false, ts+2);
+		UpdateCacheIT.helpTestUpdateCache(false, false, ts+2);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 0bdb65a..2aef7f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -58,6 +59,7 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
@@ -203,7 +205,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
         // Set miscellaneous scan attributes. This is the last chance to set them before we
         // clone the scan for each parallelized chunk.
         Scan scan = context.getScan();
-        PTable table = context.getCurrentTable().getTable();
+        TableRef tableRef = context.getCurrentTable();
+        PTable table = tableRef.getTable();
         
         if (dynamicFilter != null) {
             WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null);            
@@ -231,7 +234,12 @@ public abstract class BaseQueryPlan implements QueryPlan {
 	        TimeRange scanTimeRange = scan.getTimeRange();
 	        Long scn = connection.getSCN();
 	        if (scn == null) {
-	            scn = context.getCurrentTime();
+	            // If we haven't resolved the time at the beginning of compilation, don't
+	            // force the lookup on the server, but use HConstants.LATEST_TIMESTAMP instead.
+	            scn = tableRef.getTimeStamp();
+	            if (scn == QueryConstants.UNSET_TIMESTAMP) {
+	                scn = HConstants.LATEST_TIMESTAMP;
+	            }
 	        }
 	        try {
 	            TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);