You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/12/14 20:31:26 UTC
[3/3] phoenix git commit: PHOENIX-2519 Prevent RPC for SYSTEM tables
when querying
PHOENIX-2519 Prevent RPC for SYSTEM tables when querying
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/470b7dd8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/470b7dd8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/470b7dd8
Branch: refs/heads/master
Commit: 470b7dd8f8d709a175d8606251efb4e7be3a59f5
Parents: e8a7fee
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Dec 13 14:31:00 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Dec 14 11:31:15 2015 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/rpc/UpdateCacheIT.java | 46 +++++++++++++++-----
.../phoenix/rpc/UpdateCacheWithScnIT.java | 2 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 12 ++++-
3 files changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
index 3b3c6c7..208af94 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -78,26 +79,49 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
}
+ private static void setupSystemTable(Long scn) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ if (scn != null) {
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(
+ "create table " + QueryConstants.SYSTEM_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA);
+ }
+ }
+
@Test
public void testUpdateCacheForTxnTable() throws Exception {
- helpTestUpdateCache(true, null);
+ helpTestUpdateCache(true, false, null);
}
@Test
public void testUpdateCacheForNonTxnTable() throws Exception {
- helpTestUpdateCache(false, null);
+ helpTestUpdateCache(false, false, null);
}
- public static void helpTestUpdateCache(boolean isTransactional, Long scn) throws Exception {
+ @Test
+ public void testUpdateCacheForNonTxnSystemTable() throws Exception {
+ helpTestUpdateCache(false, true, null);
+ }
+
+ public static void helpTestUpdateCache(boolean isTransactional, boolean isSystem, Long scn) throws Exception {
String tableName = isTransactional ? TRANSACTIONAL_DATA_TABLE : MUTABLE_INDEX_DATA_TABLE;
- String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
+ String schemaName;
+ if (isSystem) {
+ setupSystemTable(scn);
+ schemaName = QueryConstants.SYSTEM_SCHEMA_NAME;
+ } else {
+ schemaName = INDEX_DATA_SCHEMA;
+ }
+ String fullTableName = schemaName + QueryConstants.NAME_SEPARATOR + tableName;
String selectSql = "SELECT * FROM "+fullTableName;
// use a spyed ConnectionQueryServices so we can verify calls to getTable
ConnectionQueryServices connectionQueryServices = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)));
Properties props = new Properties();
props.putAll(PhoenixEmbeddedDriver.DEFFAULT_PROPS.asMap());
if (scn!=null) {
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+10));
}
Connection conn = connectionQueryServices.connect(getUrl(), props);
try {
@@ -112,13 +136,14 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
TestUtil.setRowKeyColumns(stmt, 3);
stmt.execute();
conn.commit();
- // verify only one rpc to fetch table metadata,
- verify(connectionQueryServices).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
+ int numUpsertRpcs = isSystem ? 0 : 1;
+ // verify only 0 or 1 rpc to fetch table metadata,
+ verify(connectionQueryServices, times(numUpsertRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
reset(connectionQueryServices);
if (scn!=null) {
// advance scn so that we can see the data we just upserted
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+2));
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+20));
conn = connectionQueryServices.connect(getUrl(), props);
}
@@ -143,8 +168,9 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
// for non-transactional tables without a scn : verify one rpc to getTable occurs *per* query
// for non-transactional tables with a scn : verify *only* one rpc occurs
// for transactional tables : verify *only* one rpc occurs
- int numRpcs = isTransactional || scn!=null ? 1 : 3;
- verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
+ // for non-transactional, system tables : verify non rpc occurs
+ int numRpcs = isSystem ? 0 : (isTransactional || scn!=null ? 1 : 3);
+ verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
}
finally {
conn.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
index dbc7fd1..5ff2fb0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
@@ -35,7 +35,7 @@ public class UpdateCacheWithScnIT extends BaseClientManagedTimeIT {
@Test
public void testUpdateCacheWithScn() throws Exception {
- UpdateCacheIT.helpTestUpdateCache(false, ts+2);
+ UpdateCacheIT.helpTestUpdateCache(false, false, ts+2);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/470b7dd8/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 0bdb65a..2aef7f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -58,6 +59,7 @@ import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -203,7 +205,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
// Set miscellaneous scan attributes. This is the last chance to set them before we
// clone the scan for each parallelized chunk.
Scan scan = context.getScan();
- PTable table = context.getCurrentTable().getTable();
+ TableRef tableRef = context.getCurrentTable();
+ PTable table = tableRef.getTable();
if (dynamicFilter != null) {
WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null);
@@ -231,7 +234,12 @@ public abstract class BaseQueryPlan implements QueryPlan {
TimeRange scanTimeRange = scan.getTimeRange();
Long scn = connection.getSCN();
if (scn == null) {
- scn = context.getCurrentTime();
+ // If we haven't resolved the time at the beginning of compilation, don't
+ // force the lookup on the server, but use HConstants.LATEST_TIMESTAMP instead.
+ scn = tableRef.getTimeStamp();
+ if (scn == QueryConstants.UNSET_TIMESTAMP) {
+ scn = HConstants.LATEST_TIMESTAMP;
+ }
}
try {
TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);