You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/07 09:40:33 UTC
[2/4] git commit: Atomically increment table timestamp and fix unit
tests
Atomically increment table timestamp and fix unit tests
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/712ce1ef
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/712ce1ef
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/712ce1ef
Branch: refs/heads/3.0
Commit: 712ce1efba5b27ed23f9c2f8c42b9adacd48e316
Parents: 12fa6f7
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Oct 6 19:47:29 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 6 19:47:29 2014 -0700
----------------------------------------------------------------------
.../end2end/BaseTenantSpecificTablesIT.java | 2 +-
.../end2end/TenantSpecificTablesDMLIT.java | 16 ++-
.../coprocessor/MetaDataEndpointImpl.java | 72 +++++++-----
.../phoenix/coprocessor/MetaDataProtocol.java | 2 +-
.../phoenix/query/ConnectionQueryServices.java | 2 +-
.../query/ConnectionQueryServicesImpl.java | 4 +-
.../query/ConnectionlessQueryServicesImpl.java | 2 +-
.../query/DelegateConnectionQueryServices.java | 4 +-
.../apache/phoenix/schema/MetaDataClient.java | 18 ++-
.../phoenix/schema/stat/StatisticsTable.java | 4 +-
.../phoenix/schema/stat/StatisticsUtil.java | 111 +++++++++++++++++++
.../phoenix/schema/stat/StatisticsUtils.java | 111 -------------------
12 files changed, 187 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
index b8fa035..fe08ac5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
@@ -84,7 +84,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
// Must update config before starting server
- props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20l));
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID;
PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
index 4b20979..7d172da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
@@ -53,6 +53,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn, TENANT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " where id = 1");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -75,6 +76,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn1.commit();
conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn1, TENANT_TABLE_NAME);
+ conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+
conn2.setAutoCommit(true);
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('them','" + TENANT_TYPE_ID + "',1,'Long Hair')");
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('us','" + TENANT_TYPE_ID + "',2,'Black Hat')");
@@ -91,6 +94,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn1.close();
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
analyzeTable(conn2, TENANT_TABLE_NAME);
+ conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
rs = conn2.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 2");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals(2, rs.getInt(3));
@@ -141,6 +145,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
assertTrue("Expected 1 row in result set", rs.next());
assertEquals(2, rs.getInt(3));
assertEquals("Viva Las Vegas", rs.getString(4));
+
List<KeyRange> splits = getAllSplits(conn1, TENANT_TABLE_NAME);
assertEquals(3, splits.size());
}
@@ -177,6 +182,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn, TENANT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " join foo on k=id");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -248,6 +254,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn = nextConnection(getUrl());
analyzeTable(conn, PARENT_TABLE_NAME);
+ conn = nextConnection(getUrl());
rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME);
rs.next();
assertEquals(2, rs.getInt(1));
@@ -308,6 +315,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn, PARENT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
conn.createStatement().execute("delete from " + TENANT_TABLE_NAME);
conn.commit();
conn.close();
@@ -343,6 +351,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn = nextConnection(getUrl());
analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
+ conn = nextConnection(getUrl());
ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
rs.next();
assertEquals(3, rs.getInt(1));
@@ -368,8 +377,9 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
- conn.setAutoCommit(true);
analyzeTable(conn, TENANT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ conn.setAutoCommit(true);
int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from " + TENANT_TABLE_NAME);
assertEquals("Expected 1 row to have been inserted", 1, count);
conn.close();
@@ -403,8 +413,9 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
- conn.setAutoCommit(true);
analyzeTable(conn, TENANT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ conn.setAutoCommit(true);
int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from ANOTHER_TENANT_TABLE where id=2");
assertEquals("Expected 1 row to have been inserted", 1, count);
conn.close();
@@ -452,6 +463,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn, PARENT_TABLE_NAME);
+ conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
rs = conn.createStatement().executeQuery("select user from " + PARENT_TABLE_NAME);
assertTrue(rs.next());
assertEquals(rs.getString(1),"Billy Gibbons");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c21dd2e..1826299 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -106,7 +106,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stat.PTableStats;
-import org.apache.phoenix.schema.stat.StatisticsUtils;
+import org.apache.phoenix.schema.stat.StatisticsUtil;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
@@ -462,7 +462,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
if (tenantId == null) {
HTableInterface statsHTable = getEnvironment().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
try {
- StatisticsUtils.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
+ stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?");
} finally {
@@ -1118,7 +1118,38 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
}
+ private PTable incrementTableTimestamp(byte[] key, long clientTimeStamp) throws IOException, SQLException {
+ RegionCoprocessorEnvironment env = getEnvironment();
+ HRegion region = env.getRegion();
+ Integer lid = region.getLock(null, key, true);
+ if (lid == null) {
+ throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+ }
+ try {
+ PTable table = doGetTable(key, clientTimeStamp, lid);
+ if (table != null) {
+ long tableTimeStamp = table.getTimeStamp() + 1;
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1);
+ Put p = new Put(key);
+ p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, tableTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(p);
+ region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
+
+ Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(getEnvironment()).getMetaDataCache();
+ ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+ metaDataCache.invalidate(cacheKey);
+ }
+ return table;
+ } finally {
+ region.releaseRowLock(lid);
+ }
+ }
+
private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
+ return doGetTable(key, clientTimeStamp, null);
+ }
+
+ private PTable doGetTable(byte[] key, long clientTimeStamp, Integer lid) throws IOException, SQLException {
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
PTable table = metaDataCache.getIfPresent(cacheKey);
@@ -1143,9 +1174,12 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
* This will just prevent a table from getting rebuilt
* too often.
*/
- Integer lid = region.getLock(null, key, true);
- if (lid == null) {
- throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+ final boolean wasLocked = (lid != null);
+ if (!wasLocked) {
+ lid = region.getLock(null, key, true);
+ if (lid == null) {
+ throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+ }
}
try {
// Try cache again in case we were waiting on a lock
@@ -1168,7 +1202,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
// Otherwise, query for an older version of the table - it won't be cached
return buildTable(key, cacheKey, region, clientTimeStamp);
} finally {
- if (lid != null) region.releaseRowLock(lid);
+ if (!wasLocked) region.releaseRowLock(lid);
}
}
@@ -1374,31 +1408,13 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
@Override
- public void clearCacheForTable(byte[] tenantId, byte[] schema, byte[] tableName, final long clientTS)
+ public void incrementTableTimeStamp(byte[] tenantId, byte[] schemaName, byte[] tableName, final long clientTimeStamp)
throws IOException {
- byte[] tableKey = SchemaUtil.getTableKey(tenantId, schema, tableName);
- ImmutableBytesPtr key = new ImmutableBytesPtr(tableKey);
try {
- PTable table = doGetTable(tableKey, clientTS);
- if (table != null) {
- Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(getEnvironment()).getMetaDataCache();
- // Add +1 to the ts
- // TODO : refactor doGetTable() to do this - optionally increment the timestamp
- // TODO : clear metadata if it is spread across multiple region servers
- long ts = table.getTimeStamp() + 1;
- // Here we could add an empty puti
- HRegion region = getEnvironment().getRegion();
- List<Mutation> mutations = new ArrayList<Mutation>();
- Put p = new Put(tableKey);
- p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ts, ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(p);
- region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
- metaDataCache.invalidate(key);
- }
+ byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ incrementTableTimestamp(tableKey, clientTimeStamp);
} catch (Throwable t) {
- // We could still invalidate it
- logger.error("clearCacheForTable failed to update the latest ts ", t);
- throw new IOException(t);
+ ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 66fb1ac..606243d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -267,7 +267,7 @@ public interface MetaDataProtocol extends CoprocessorProtocol {
*/
void clearCache();
- void clearCacheForTable(final byte[] tenantID, final byte[] schema, final byte[] tableName, final long clientTS)
+ void incrementTableTimeStamp(byte[] tenantId, byte[] schema, byte[] tableName, long clientTimestamp)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 26e4809..2b4a109 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -103,5 +103,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public boolean supportsFeature(Feature feature);
public String getUserName();
- public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException;
+ public void incrementTableTimeStamp(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b3875cb..7b8d79c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1807,7 +1807,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
+ public void incrementTableTimeStamp(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {
// clear the meta data cache for the table here
try {
@@ -1819,7 +1819,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() {
@Override
public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException {
- instance.clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+ instance.incrementTableTimeStamp(tenantId, schemaName, tableName, clientTS);
// TODO : Should this really return a result?Return null
return new MetaDataMutationResult();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 70e656a..3e4643f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -183,7 +183,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+ public void incrementTableTimeStamp(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
throws SQLException {}
// TODO: share this with ConnectionQueryServicesImpl
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 8bd2c61..86523fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -228,8 +228,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+ public void incrementTableTimeStamp(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
throws SQLException {
- getDelegate().clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+ getDelegate().incrementTableTimeStamp(tenantId, schemaName, tableName, clientTS);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 3e1b5bb..c0947b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -482,20 +482,17 @@ public class MetaDataClient {
Long scn = connection.getSCN();
// Always invalidate the cache
long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
- connection.getQueryServices().clearCacheForTable(tenantIdBytes,
- Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(physicalName.getString())),
- Bytes.toBytes(SchemaUtil.getTableNameFromFullName(physicalName.getString())), clientTS);
- String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ String query = "SELECT CURRENT_DATE() - " + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
- + " IS NULL AND " + REGION_NAME + " IS NULL";
+ + " IS NULL AND " + REGION_NAME + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL";
ResultSet rs = connection.createStatement().executeQuery(query);
long msSinceLastUpdate = Long.MAX_VALUE;
- if (rs.next() && rs.getDate(2) != null) {
- msSinceLastUpdate = rs.getDate(1).getTime() - rs.getDate(2).getTime();
+ if (rs.next()) {
+ msSinceLastUpdate = rs.getLong(1);
}
if (msSinceLastUpdate >= msMinBetweenUpdates) {
// Here create the select query.
- String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString();
+ String countQuery = "SELECT /*+ NO_CACHE NO_INDEX */ count(*) FROM " + table.getName().getString();
PhoenixStatement statement = (PhoenixStatement) connection.createStatement();
QueryPlan plan = statement.compileQuery(countQuery);
Scan scan = plan.getContext().getScan();
@@ -510,8 +507,9 @@ public class MetaDataClient {
tempPtr.set(kv.getValue());
// A single Cell will be returned with the count(*) - we decode that here
long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault());
- // We need to update the stats table
- connection.getQueryServices().clearCacheForTable(tenantIdBytes,
+ // We need to update the stats table so that client will pull the new one with
+ // the updated stats.
+ connection.getQueryServices().incrementTableTimeStamp(tenantIdBytes,
Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(physicalName.getString())),
Bytes.toBytes(SchemaUtil.getTableNameFromFullName(physicalName.getString())), clientTS);
return new MutationState(0, connection, rowCount);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index 74bb189..6c9b8f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -99,7 +99,7 @@ public class StatisticsTable implements Closeable {
public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
if (tracker == null) { return; }
- byte[] prefix = StatisticsUtils.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
+ byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
Put put = new Put(prefix);
if (tracker.getGuidePosts(fam) != null) {
@@ -138,7 +138,7 @@ public class StatisticsTable implements Closeable {
public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
throws IOException {
- byte[] prefix = StatisticsUtils.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
+ byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
mutations.add(new Delete(prefix, clientTimeStamp - 1));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
new file mode 100644
index 0000000..2086d31
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+
+import com.google.common.collect.Lists;
+/**
+ * Simple utility class for managing multiple key parts of the statistic
+ */
+public class StatisticsUtil {
+ private StatisticsUtil() {
+ // private ctor for utility classes
+ }
+
+ /** Number of parts in our complex key */
+ protected static final int NUM_KEY_PARTS = 3;
+
+ public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) {
+ // always starts with the source table
+ byte[] rowKey = new byte[table.length + fam.length + region.length + 2];
+ int offset = 0;
+ System.arraycopy(table, 0, rowKey, offset, table.length);
+ offset += table.length;
+ rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+ System.arraycopy(fam, 0, rowKey, offset, fam.length);
+ offset += fam.length;
+ rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+ System.arraycopy(region, 0, rowKey, offset, region.length);
+ return rowKey;
+ }
+
+ public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
+ ResultScanner scanner = statsHTable.getScanner(s);
+ try {
+ Result result = null;
+ TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
+ while ((result = scanner.next()) != null) {
+ KeyValue current = result.raw()[0];
+ int tableNameLength = tableNameBytes.length + 1;
+ int cfOffset = current.getRowOffset() + tableNameLength;
+ int cfLength = getVarCharLength(current.getBuffer(), cfOffset, current.getRowLength() - tableNameLength);
+ ptr.set(current.getBuffer(), cfOffset, cfLength);
+ byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getBuffer(), current.getValueOffset(), current
+ .getValueLength());
+ if (array != null && array.getDimensions() != 0) {
+ List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());
+ for (int j = 0; j < array.getDimensions(); j++) {
+ byte[] gp = array.toBytes(j);
+ if (gp.length != 0) {
+ guidePosts.add(gp);
+ }
+ }
+ List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
+ if (gps != null) { // Add guidepost already there from other regions
+ guidePosts.addAll(gps);
+ }
+ }
+ }
+ if (!guidePostsPerCf.isEmpty()) {
+ // Sort guideposts, as the order above will depend on the order we traverse
+ // each region's worth of guideposts above.
+ for (List<byte[]> gps : guidePostsPerCf.values()) {
+ Collections.sort(gps, Bytes.BYTES_COMPARATOR);
+ }
+ return new PTableStatsImpl(guidePostsPerCf);
+ }
+ } finally {
+ scanner.close();
+ }
+ return PTableStatsImpl.NO_STATS;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/712ce1ef/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
deleted file mode 100644
index e2eb791..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-
-import com.google.common.collect.Lists;
-/**
- * Simple utility class for managing multiple key parts of the statistic
- */
-public class StatisticsUtils {
- private StatisticsUtils() {
- // private ctor for utility classes
- }
-
- /** Number of parts in our complex key */
- protected static final int NUM_KEY_PARTS = 3;
-
- public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) {
- // always starts with the source table
- byte[] rowKey = new byte[table.length + fam.length + region.length + 2];
- int offset = 0;
- System.arraycopy(table, 0, rowKey, offset, table.length);
- offset += table.length;
- rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
- System.arraycopy(fam, 0, rowKey, offset, fam.length);
- offset += fam.length;
- rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
- System.arraycopy(region, 0, rowKey, offset, region.length);
- return rowKey;
- }
-
- public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
- s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
- ResultScanner scanner = statsHTable.getScanner(s);
- try {
- Result result = null;
- TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
- while ((result = scanner.next()) != null) {
- KeyValue current = result.raw()[0];
- int tableNameLength = tableNameBytes.length + 1;
- int cfOffset = current.getRowOffset() + tableNameLength;
- int cfLength = getVarCharLength(current.getBuffer(), cfOffset, current.getRowLength() - tableNameLength);
- ptr.set(current.getBuffer(), cfOffset, cfLength);
- byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
- PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getBuffer(), current.getValueOffset(), current
- .getValueLength());
- if (array != null && array.getDimensions() != 0) {
- List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());
- for (int j = 0; j < array.getDimensions(); j++) {
- byte[] gp = array.toBytes(j);
- if (gp.length != 0) {
- guidePosts.add(gp);
- }
- }
- List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
- if (gps != null) { // Add guidepost already there from other regions
- guidePosts.addAll(gps);
- }
- }
- }
- if (!guidePostsPerCf.isEmpty()) {
- // Sort guideposts, as the order above will depend on the order we traverse
- // each region's worth of guideposts above.
- for (List<byte[]> gps : guidePostsPerCf.values()) {
- Collections.sort(gps, Bytes.BYTES_COMPARATOR);
- }
- return new PTableStatsImpl(guidePostsPerCf);
- }
- } finally {
- scanner.close();
- }
- return PTableStatsImpl.NO_STATS;
- }
-}
\ No newline at end of file