You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/09/20 01:43:29 UTC
[2/5] PHOENIX-180 Use stats to guide query parallelization
(Ramkrishna S Vasudevan)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7e1e0a5..987d200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -79,6 +79,8 @@ import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
@@ -90,6 +92,9 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -123,6 +128,7 @@ import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
@@ -141,6 +147,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.HBaseZeroCopyByteString;
+import com.google.protobuf.ServiceException;
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
@@ -153,7 +160,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final ReadOnlyProps props;
private final String userName;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
- private final StatsManager statsManager;
// Cache the latest meta data here for future connections
// writes guarded by "latestMetaDataLock"
@@ -211,10 +217,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// TODO: should we track connection wide memory usage or just org-wide usage?
// If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate
this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY);
- int statsUpdateFrequencyMs = this.getProps().getInt(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
- int maxStatsAgeMs = this.getProps().getInt(QueryServices.MAX_STATS_AGE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_STATS_AGE_MS);
- this.statsManager = new StatsManagerImpl(this, statsUpdateFrequencyMs, maxStatsAgeMs);
-
// find the HBase version and use that to determine the KeyValueBuilder that should be used
String hbaseVersion = VersionInfo.getVersion();
this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
@@ -242,11 +244,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public StatsManager getStatsManager() {
- return this.statsManager;
- }
-
- @Override
public HTableInterface getTable(byte[] tableName) throws SQLException {
try {
return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor());
@@ -307,42 +304,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
sqlE = e;
} finally {
try {
- // Clear any client-side caches.
- statsManager.clearStats();
- } catch (SQLException e) {
+ childServices.clear();
+ synchronized (latestMetaDataLock) {
+ latestMetaData = null;
+ latestMetaDataLock.notifyAll();
+ }
+ if (connection != null) connection.close();
+ } catch (IOException e) {
if (sqlE == null) {
- sqlE = e;
+ sqlE = ServerUtil.parseServerException(e);
} else {
- sqlE.setNextException(e);
+ sqlE.setNextException(ServerUtil.parseServerException(e));
}
} finally {
try {
- childServices.clear();
- synchronized (latestMetaDataLock) {
- latestMetaData = null;
- latestMetaDataLock.notifyAll();
- }
- if (connection != null) connection.close();
- } catch (IOException e) {
+ super.close();
+ } catch (SQLException e) {
if (sqlE == null) {
- sqlE = ServerUtil.parseServerException(e);
+ sqlE = e;
} else {
- sqlE.setNextException(ServerUtil.parseServerException(e));
+ sqlE.setNextException(e);
}
} finally {
- try {
- super.close();
- } catch (SQLException e) {
- if (sqlE == null) {
- sqlE = e;
- } else {
- sqlE.setNextException(e);
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
- }
+ if (sqlE != null) { throw sqlE; }
}
}
}
@@ -615,7 +599,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
}
-
+
+ if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
+ descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
+ }
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
// don't install on the metadata table until we fix the TODO there.
@@ -1531,19 +1518,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+
+ // TODO : Get this from a configuration
+ metaConnection.createStatement().executeUpdate(
+ QueryConstants.CREATE_STATS_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
- // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
- // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
} catch (TableAlreadyExistsException ignore) {
- // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
- // any new columns we've added.
- String newColumns =
- MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", "
- + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", "
- + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", "
- + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName();
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns);
}
} catch (Exception e) {
if (e instanceof SQLException) {
@@ -1873,6 +1853,92 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ @Override
+ public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
+ HTableInterface ht = null;
+ try {
+ ht = this.getTable(tableName);
+ Batch.Call<StatCollectService, StatCollectResponse> callable = new Batch.Call<StatCollectService, StatCollectResponse>() {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<StatCollectResponse> rpcCallback = new BlockingRpcCallback<StatCollectResponse>();
+
+ @Override
+ public StatCollectResponse call(StatCollectService service) throws IOException {
+ StatCollectRequest.Builder builder = StatCollectRequest.newBuilder();
+ builder.setStartRow(HBaseZeroCopyByteString.wrap(keyRange.getLowerRange()));
+ builder.setStopRow(HBaseZeroCopyByteString.wrap(keyRange.getUpperRange()));
+ service.collectStat(controller, builder.build(), rpcCallback);
+ if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ return rpcCallback.get();
+ }
+ };
+ Map<byte[], StatCollectResponse> result = ht.coprocessorService(StatCollectService.class,
+ keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
+ StatCollectResponse next = result.values().iterator().next();
+ return next.getRowsScanned();
+ } catch (ServiceException e) {
+ throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+ } catch (TableNotFoundException e) {
+ throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+ } catch (Throwable e) {
+ throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+ } finally {
+ if (ht != null) {
+ try {
+ ht.close();
+ } catch (IOException e) {
+ throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
+ final long clientTS) throws SQLException {
+ // clear the meta data cache for the table here
+ try {
+ SQLException sqlE = null;
+ HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ try {
+ htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ new Batch.Call<MetaDataService, ClearCacheForTableResponse>() {
+ @Override
+ public ClearCacheForTableResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<ClearCacheForTableResponse> rpcCallback = new BlockingRpcCallback<ClearCacheForTableResponse>();
+ ClearCacheForTableRequest.Builder builder = ClearCacheForTableRequest.newBuilder();
+ builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantId));
+ builder.setTableName(HBaseZeroCopyByteString.wrap(tableName));
+ builder.setSchemaName(HBaseZeroCopyByteString.wrap(schemaName));
+ builder.setClientTimestamp(clientTS);
+ instance.clearCacheForTable(controller, builder.build(), rpcCallback);
+ if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ return rpcCallback.get();
+ }
+ });
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } catch (Throwable e) {
+ sqlE = new SQLException(e);
+ } finally {
+ try {
+ htable.close();
+ } catch (IOException e) {
+ if (sqlE == null) {
+ sqlE = ServerUtil.parseServerException(e);
+ } else {
+ sqlE.setNextException(ServerUtil.parseServerException(e));
+ }
+ } finally {
+ if (sqlE != null) { throw sqlE; }
+ }
+ }
+ } catch (Exception e) {
+ throw new SQLException(ServerUtil.parseServerException(e));
+ }
+ }
+
@SuppressWarnings("deprecation")
@Override
public void returnSequences(List<SequenceKey> keys, long timestamp, SQLException[] exceptions) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 61c2ef8..9fa415c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -68,7 +68,6 @@ import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -121,30 +120,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public StatsManager getStatsManager() {
- return new StatsManager() {
-
- @Override
- public byte[] getMinKey(TableRef table) {
- return HConstants.EMPTY_START_ROW;
- }
-
- @Override
- public byte[] getMaxKey(TableRef table) {
- return HConstants.EMPTY_END_ROW;
- }
-
- @Override
- public void updateStats(TableRef table) throws SQLException {
- }
-
- @Override
- public void clearStats() throws SQLException {
- }
- };
- }
-
- @Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
return Collections.singletonList(new HRegionLocation(
new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
@@ -214,6 +189,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
}
+ @Override
+ public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+ // Noop
+ return 0;
+ }
+
+ @Override
+ public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+ throws SQLException {}
// TODO: share this with ConnectionQueryServicesImpl
@Override
public void init(String url, Properties props) throws SQLException {
@@ -249,6 +233,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
}
+ try {
+ // TODO : Get this from a configuration
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
+ // timestamp.
+ // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
+ // fixed timestamp.
+ }
} catch (SQLException e) {
sqlE = e;
} finally {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 0b6a399..fa01f09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -65,11 +65,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public StatsManager getStatsManager() {
- return getDelegate().getStatsManager();
- }
-
- @Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
return getDelegate().getAllTableRegions(tableName);
}
@@ -231,4 +226,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public String getUserName() {
return getDelegate().getUserName();
}
+
+ @Override
+ public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+ return getDelegate().updateStatistics(keyRange, tableName);
+ }
+
+ @Override
+ public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+ throws SQLException {
+ getDelegate().clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index da2d487..bbc653e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -34,24 +34,31 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_KEY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_KEY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
@@ -67,6 +74,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -78,8 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+
import java.math.BigDecimal;
import org.apache.hadoop.hbase.HConstants;
@@ -111,6 +118,7 @@ public interface QueryConstants {
public enum JoinType {INNER, LEFT_OUTER}
public final static String SYSTEM_SCHEMA_NAME = "SYSTEM";
+ public final static byte[] SYSTEM_SCHEMA_NAME_BYTES = Bytes.toBytes(SYSTEM_SCHEMA_NAME);
public final static String PHOENIX_METADATA = "table";
public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
@@ -220,6 +228,22 @@ public interface QueryConstants {
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ public static final String CREATE_STATS_TABLE_METADATA =
+ "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
+ // PK columns
+ PHYSICAL_NAME + " VARCHAR NOT NULL," +
+ COLUMN_FAMILY + " VARCHAR," +
+ REGION_NAME + " VARCHAR," +
+ GUIDE_POSTS + " VARBINARY[]," +
+ MIN_KEY + " VARBINARY," +
+ MAX_KEY + " VARBINARY," +
+ LAST_STATS_UPDATE_TIME+ " DATE, "+
+ "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY ("
+ + PHYSICAL_NAME + ","
+ + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" +
+ HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+
public static final String CREATE_SEQUENCE_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +
TENANT_ID + " VARCHAR NULL," +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 9594f33..fd4152b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -68,18 +68,14 @@ public interface QueryServices extends SQLCloseable {
public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs";
public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage";
public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes";
- public static final String TARGET_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.targetConcurrency";
- public static final String MAX_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.maxConcurrency";
public static final String DATE_FORMAT_ATTRIB = "phoenix.query.dateFormat";
public static final String NUMBER_FORMAT_ATTRIB = "phoenix.query.numberFormat";
public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.query.statsUpdateFrequency";
- public static final String MAX_STATS_AGE_MS_ATTRIB = "phoenix.query.maxStatsAge";
public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
- public static final String MAX_INTRA_REGION_PARALLELIZATION_ATTRIB = "phoenix.query.maxIntraRegionParallelization";
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";
public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes";
public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows";
@@ -134,7 +130,9 @@ public interface QueryServices extends SQLCloseable {
public static final String TRACING_STATS_TABLE_NAME_ATTRIB = "phoenix.trace.statsTableName";
public static final String USE_REVERSE_SCAN_ATTRIB = "phoenix.query.useReverseScan";
-
+
+ public static final String HISTOGRAM_BYTE_DEPTH_CONF_KEY = "phoenix.guidepost.width";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 605d44c..a0bc4da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,16 +24,15 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY;
import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
@@ -51,7 +50,6 @@ import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -75,7 +73,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
public class QueryServicesOptions {
public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
public static final int DEFAULT_THREAD_POOL_SIZE = 128;
- public static final int DEFAULT_QUEUE_SIZE = 500;
+ public static final int DEFAULT_QUEUE_SIZE = 5000;
public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp";
@@ -146,6 +144,8 @@ public class QueryServicesOptions {
public static final String DEFAULT_TRACING_STATS_TABLE_NAME = "SYSTEM.TRACING_STATS";
public static final String DEFAULT_TRACING_FREQ = Tracing.Frequency.NEVER.getKey();
public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
+ public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 1024 * 1024;
+
public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
@@ -186,13 +186,10 @@ public class QueryServicesOptions {
.setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
.setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE)
.setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
- .setIfUnset(TARGET_QUERY_CONCURRENCY_ATTRIB, DEFAULT_TARGET_QUERY_CONCURRENCY)
- .setIfUnset(MAX_QUERY_CONCURRENCY_ATTRIB, DEFAULT_MAX_QUERY_CONCURRENCY)
.setIfUnset(DATE_FORMAT_ATTRIB, DEFAULT_DATE_FORMAT)
.setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
.setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
.setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
- .setIfUnset(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION)
.setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)
.setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
.setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
@@ -204,6 +201,7 @@ public class QueryServicesOptions {
.setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
.setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
.setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
+ .setIfUnset(HISTOGRAM_BYTE_DEPTH_CONF_KEY, DEFAULT_HISTOGRAM_BYTE_DEPTH);
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
@@ -294,14 +292,6 @@ public class QueryServicesOptions {
return set(SCAN_CACHE_SIZE_ATTRIB, scanFetchSize);
}
- public QueryServicesOptions setMaxQueryConcurrency(int maxQueryConcurrency) {
- return set(MAX_QUERY_CONCURRENCY_ATTRIB, maxQueryConcurrency);
- }
-
- public QueryServicesOptions setTargetQueryConcurrency(int targetQueryConcurrency) {
- return set(TARGET_QUERY_CONCURRENCY_ATTRIB, targetQueryConcurrency);
- }
-
public QueryServicesOptions setDateFormat(String dateFormat) {
return set(DATE_FORMAT_ATTRIB, dateFormat);
}
@@ -310,6 +300,10 @@ public class QueryServicesOptions {
return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
}
+ public QueryServicesOptions setHistogramDepthBytes(int depth) {
+ return set(HISTOGRAM_BYTE_DEPTH_CONF_KEY, depth);
+ }
+
public QueryServicesOptions setCallQueueRoundRobin(boolean isRoundRobin) {
return set(CALL_QUEUE_PRODUCER_ATTRIB_NAME, isRoundRobin);
}
@@ -322,10 +316,6 @@ public class QueryServicesOptions {
return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
}
- public QueryServicesOptions setMaxIntraRegionParallelization(int maxIntraRegionParallelization) {
- return set(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, maxIntraRegionParallelization);
- }
-
public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) {
return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable);
}
@@ -394,11 +384,7 @@ public class QueryServicesOptions {
public int getMutateBatchSize() {
return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE);
}
-
- public int getMaxIntraRegionParallelization() {
- return config.getInt(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
- }
-
+
public boolean isUseIndexes() {
return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 56b6604..eafac8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -37,15 +37,19 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -60,8 +64,10 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADAT
import static org.apache.phoenix.schema.PDataType.VARCHAR;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -123,6 +129,8 @@ import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.PrimaryKeyConstraint;
import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -133,6 +141,7 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -462,6 +471,55 @@ public class MetaDataClient {
return connection.getQueryServices().updateData(plan);
}
+ public MutationState updateStatistics(UpdateStatisticsStatement updateStatisticsStmt) throws SQLException {
+ // Check before updating the stats if we have reached the configured time to reupdate the stats once again
+ long minTimeForStatsUpdate = connection.getQueryServices().getProps()
+ .getLong(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
+ ColumnResolver resolver = FromCompiler.getResolver(updateStatisticsStmt, connection);
+ PTable table = resolver.getTables().get(0).getTable();
+ PName physicalName = table.getPhysicalName();
+ byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+ KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
+ if (connection.getTenantId() != null && table.isMultiTenant()) {
+ tenantIdBytes = connection.getTenantId().getBytes();
+ // TODO remove this inner if once PHOENIX-1259 is fixed.
+ if (table.getBucketNum() == null && table.getIndexType() != IndexType.LOCAL) {
+ List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
+ .getKeyRange(tenantIdBytes)));
+ byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
+ ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
+ ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
+ }
+ }
+ Long scn = connection.getSCN();
+ // Always invalidate the cache
+ long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
+ connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
+ table.getTableName().getBytes(), clientTS);
+ // Clear the cache also. So that for cases like major compaction also we would be able to use the stats
+ updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
+ String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + SYSTEM_CATALOG_SCHEMA
+ + "." + SYSTEM_STATS_TABLE + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
+ + " IS NULL AND " + REGION_NAME + " IS NULL";
+ ResultSet rs = connection.createStatement().executeQuery(query);
+ long lastUpdatedTime = 0;
+ if (rs.next() && rs.getDate(2) != null) {
+ lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
+ }
+ if (minTimeForStatsUpdate > lastUpdatedTime) {
+ // We need to update the stats table
+ connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
+ connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
+ table.getTableName().getBytes(), clientTS);
+ updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
+ return new MutationState(1, connection);
+ } else {
+ return new MutationState(0, connection);
+ }
+ }
+
private MutationState buildIndexAtTimeStamp(PTable index, NamedTableNode dataTableNode) throws SQLException {
// If our connection is at a fixed point-in-time, we need to open a new
// connection so that our new index table is visible.
@@ -1440,7 +1498,8 @@ public class MetaDataClient {
return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false);
}
- private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType, boolean ifExists, boolean cascade) throws SQLException {
+ private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
+ boolean ifExists, boolean cascade) throws SQLException {
connection.rollback();
boolean wasAutoCommit = connection.getAutoCommit();
try {
@@ -1465,80 +1524,118 @@ public class MetaDataClient {
MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
MutationCode code = result.getMutationCode();
- switch(code) {
- case TABLE_NOT_FOUND:
- if (!ifExists) {
- throw new TableNotFoundException(schemaName, tableName);
- }
- break;
- case NEWER_TABLE_FOUND:
- throw new NewerTableAlreadyExistsException(schemaName, tableName);
- case UNALLOWED_TABLE_MUTATION:
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
- .setSchemaName(schemaName).setTableName(tableName).build().buildException();
- default:
- connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime());
-
- if (result.getTable() != null && tableType != PTableType.VIEW) {
- connection.setAutoCommit(true);
- PTable table = result.getTable();
- boolean dropMetaData = result.getTable().getViewIndexId() == null &&
- connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
- long ts = (scn == null ? result.getMutationTime() : scn);
- // Create empty table and schema - they're only used to get the name from
- // PName name, PTableType type, long timeStamp, long sequenceNumber, List<PColumn> columns
- List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(2 + table.getIndexes().size());
- // All multi-tenant tables have a view index table, so no need to check in that case
- if (tableType == PTableType.TABLE && (table.isMultiTenant() || hasViewIndexTable || hasLocalIndexTable)) {
- MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
- // TODO: consider removing this, as the DROP INDEX done for each DROP VIEW command
- // would have deleted all the rows already
- if (!dropMetaData) {
- if (hasViewIndexTable) {
- String viewIndexSchemaName = null;
- String viewIndexTableName = null;
- if(schemaName != null) {
- viewIndexSchemaName = MetaDataUtil.getViewIndexTableName(schemaName);
- viewIndexTableName = tableName;
- } else {
- viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
- }
- PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts, table.getColumnFamilies());
- tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
- }
- if (hasLocalIndexTable) {
- String localIndexSchemaName = null;
- String localIndexTableName = null;
- if(schemaName != null) {
- localIndexSchemaName = MetaDataUtil.getLocalIndexTableName(schemaName);
- localIndexTableName = tableName;
- } else {
- localIndexTableName = MetaDataUtil.getLocalIndexTableName(tableName);
- }
- PTable localIndexTable = new PTableImpl(null, localIndexSchemaName, localIndexTableName, ts, Collections.<PColumnFamily>emptyList());
- tableRefs.add(new TableRef(null, localIndexTable, ts, false));
- }
+ switch (code) {
+ case TABLE_NOT_FOUND:
+ if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); }
+ break;
+ case NEWER_TABLE_FOUND:
+ throw new NewerTableAlreadyExistsException(schemaName, tableName);
+ case UNALLOWED_TABLE_MUTATION:
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
+
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ default:
+ connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
+ result.getMutationTime());
+
+ if (result.getTable() != null && tableType != PTableType.VIEW) {
+ connection.setAutoCommit(true);
+ PTable table = result.getTable();
+ boolean dropMetaData = result.getTable().getViewIndexId() == null &&
+
+ connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+ long ts = (scn == null ? result.getMutationTime() : scn);
+ // Create empty table and schema - they're only used to get the name from
+ // PName name, PTableType type, long timeStamp, long sequenceNumber, List<PColumn> columns
+ List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(2 + table.getIndexes().size());
+ // All multi-tenant tables have a view index table, so no need to check in that case
+ if (tableType == PTableType.TABLE
+ && (table.isMultiTenant() || hasViewIndexTable || hasLocalIndexTable)) {
+
+ MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
+ if (hasViewIndexTable) {
+ String viewIndexSchemaName = null;
+ String viewIndexTableName = null;
+ if (schemaName != null) {
+ viewIndexSchemaName = MetaDataUtil.getViewIndexTableName(schemaName);
+ viewIndexTableName = tableName;
+ } else {
+ viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
}
+ PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts,
+ table.getColumnFamilies());
+ tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
}
- if (!dropMetaData) {
- // Delete everything in the column. You'll still be able to do queries at earlier timestamps
- tableRefs.add(new TableRef(null, table, ts, false));
- // TODO: Let the standard mutable secondary index maintenance handle this?
- for (PTable index: table.getIndexes()) {
- tableRefs.add(new TableRef(null, index, ts, false));
+ if (hasLocalIndexTable) {
+ String localIndexSchemaName = null;
+ String localIndexTableName = null;
+ if (schemaName != null) {
+ localIndexSchemaName = MetaDataUtil.getLocalIndexTableName(schemaName);
+ localIndexTableName = tableName;
+ } else {
+ localIndexTableName = MetaDataUtil.getLocalIndexTableName(tableName);
}
- MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn>emptyList(), ts);
- return connection.getQueryServices().updateData(plan);
+ PTable localIndexTable = new PTableImpl(null, localIndexSchemaName, localIndexTableName,
+ ts, Collections.<PColumnFamily> emptyList());
+ tableRefs.add(new TableRef(null, localIndexTable, ts, false));
}
}
- break;
+ tableRefs.add(new TableRef(null, table, ts, false));
+ // TODO: Let the standard mutable secondary index maintenance handle this?
+ for (PTable index : table.getIndexes()) {
+ tableRefs.add(new TableRef(null, index, ts, false));
+ }
+ deleteFromStatsTable(tableRefs, ts);
+ if (!dropMetaData) {
+ MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null,
+ Collections.<PColumn> emptyList(), ts);
+ // Delete everything in the column. You'll still be able to do queries at earlier timestamps
+ return connection.getQueryServices().updateData(plan);
+ }
}
- return new MutationState(0,connection);
+ break;
+ }
+ return new MutationState(0, connection);
} finally {
connection.setAutoCommit(wasAutoCommit);
}
}
+ private void deleteFromStatsTable(List<TableRef> tableRefs, long ts) throws SQLException {
+ Properties props = new Properties(connection.getClientInfo());
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ Connection conn = DriverManager.getConnection(connection.getURL(), props);
+ conn.setAutoCommit(true);
+ boolean success = false;
+ SQLException sqlException = null;
+ try {
+ StringBuilder buf = new StringBuilder("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME IN (");
+ for (TableRef ref : tableRefs) {
+ buf.append("'" + ref.getTable().getName().getString() + "',");
+ }
+ buf.setCharAt(buf.length() - 1, ')');
+ conn.createStatement().execute(buf.toString());
+ success = true;
+ } catch (SQLException e) {
+ sqlException = e;
+ } finally {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ if (sqlException == null) {
+ // If we're not in the middle of throwing another exception
+ // then throw the exception we got on close.
+ if (success) {
+ sqlException = e;
+ }
+ } else {
+ sqlException.setNextException(e);
+ }
+ }
+ if (sqlException != null) { throw sqlException; }
+ }
+ }
+
private MutationCode processMutationResult(String schemaName, String tableName, MetaDataMutationResult result) throws SQLException {
final MutationCode mutationCode = result.getMutationCode();
PName tenantId = connection.getTenantId();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
index 24da14d..01c236f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.schema;
import java.util.Collection;
+import java.util.List;
/**
*
@@ -51,4 +52,6 @@ public interface PColumnFamily {
PColumn getColumn(String name) throws ColumnNotFoundException;
int getEstimatedSize();
+
+ List<byte[]> getGuidePosts();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 5ccd50b..15ac8fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.schema;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@ public class PColumnFamilyImpl implements PColumnFamily {
private final Map<String, PColumn> columnByString;
private final Map<byte[], PColumn> columnByBytes;
private final int estimatedSize;
+ private List<byte[]> guidePosts = Collections.emptyList();
@Override
public int getEstimatedSize() {
@@ -41,9 +43,23 @@ public class PColumnFamilyImpl implements PColumnFamily {
}
public PColumnFamilyImpl(PName name, List<PColumn> columns) {
+ this(name, columns, null);
+ }
+
+ public PColumnFamilyImpl(PName name, List<PColumn> columns, List<byte[]> guidePosts) {
Preconditions.checkNotNull(name);
- long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 4 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
- SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size());
+ // Include guidePosts also in estimating the size
+ int guidePostsSize = 0;
+ if(guidePosts != null) {
+ guidePostsSize = guidePosts.size();
+ for(byte[] gps : guidePosts) {
+ guidePostsSize += gps.length;
+ }
+ Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR);
+ this.guidePosts = guidePosts;
+ }
+ long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 5 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
+ SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size()) + SizedUtil.sizeOfArrayList(guidePostsSize);
this.name = name;
this.columns = ImmutableList.copyOf(columns);
ImmutableMap.Builder<String, PColumn> columnByStringBuilder = ImmutableMap.builder();
@@ -85,4 +101,9 @@ public class PColumnFamilyImpl implements PColumnFamily {
}
return column;
}
+
+ @Override
+ public List<byte[]> getGuidePosts() {
+ return guidePosts;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index cbf0dad..374b10c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.schema.stat.PTableStats;
/**
@@ -253,10 +252,11 @@ public interface PTable {
int newKey(ImmutableBytesWritable key, byte[][] values);
/**
- * Return the statistics table associated with this PTable.
+ * Return the statistics table associated with this PTable. A list of
+ * guide posts are return
* @return the statistics table.
*/
- PTableStats getTableStats();
+ List<byte[]> getGuidePosts();
RowKeySchema getRowKeySchema();
@@ -316,4 +316,5 @@ public interface PTable {
int getEstimatedSize();
IndexType getIndexType();
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index abe637d..41faaf2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -26,11 +26,10 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@@ -96,8 +95,6 @@ public class PTableImpl implements PTable {
private ListMultimap<String,PColumn> columnsByName;
private PName pkName;
private Integer bucketNum;
- // Statistics associated with this table.
- private PTableStats stats;
private RowKeySchema rowKeySchema;
// Indexes associated with this table.
private List<PTable> indexes;
@@ -116,6 +113,7 @@ public class PTableImpl implements PTable {
private Short viewIndexId;
private int estimatedSize;
private IndexType indexType;
+ private List<byte[]> guidePosts = Collections.emptyList();
public PTableImpl() {
this.indexes = Collections.emptyList();
@@ -213,6 +211,17 @@ public class PTableImpl implements PTable {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName,
indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
}
+
+ public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
+ PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
+ List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
+ List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
+ boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, PTableStats stats)
+ throws SQLException {
+ return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
+ bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
+ viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
+ }
private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
@@ -220,6 +229,16 @@ public class PTableImpl implements PTable {
init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
new PTableStatsImpl(), dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
}
+
+ private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
+ long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns,
+ PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
+ PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
+ Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
+ init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
+ stats, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression,
+ disableWAL, multiTenant, viewType, viewIndexId, indexType);
+ }
@Override
public boolean isMultiTenant() {
@@ -240,7 +259,7 @@ public class PTableImpl implements PTable {
private void init(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
- ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException {
+ ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
Preconditions.checkNotNull(schemaName);
Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -330,16 +349,40 @@ public class PTableImpl implements PTable {
columnsInFamily.add(column);
}
}
-
this.rowKeySchema = builder.build();
estimatedSize += rowKeySchema.getEstimatedSize();
Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator();
PColumnFamily[] families = new PColumnFamily[familyMap.size()];
+ if (families.length == 0) {
+ if(stats != null) {
+ byte[] defaultFamilyNameBytes = null;
+ if(defaultFamilyName == null) {
+ defaultFamilyNameBytes = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ } else {
+ defaultFamilyNameBytes = defaultFamilyName.getBytes();
+ }
+ if (stats.getGuidePosts().get(defaultFamilyNameBytes) != null) {
+ guidePosts = stats.getGuidePosts().get(defaultFamilyNameBytes);
+ if (guidePosts != null) {
+ Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR);
+ estimatedSize += SizedUtil.sizeOfArrayList(guidePosts.size());
+ for (byte[] gps : guidePosts) {
+ estimatedSize += gps.length;
+ }
+ }
+ }
+ }
+ }
ImmutableMap.Builder<String, PColumnFamily> familyByString = ImmutableMap.builder();
- ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+ ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap
+ .orderedBy(Bytes.BYTES_COMPARATOR);
+ List<byte[]> famGuidePosts = null;
for (int i = 0; i < families.length; i++) {
Map.Entry<PName,List<PColumn>> entry = iterator.next();
- PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+ if (stats != null) {
+ famGuidePosts = stats.getGuidePosts().get(entry.getKey().getBytes());
+ }
+ PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), famGuidePosts);
families[i] = family;
familyByString.put(family.getName().getString(), family);
familyByBytes.put(family.getName().getBytes(), family);
@@ -350,8 +393,6 @@ public class PTableImpl implements PTable {
this.familyByString = familyByString.build();
estimatedSize += SizedUtil.sizeOfArrayList(families.length);
estimatedSize += SizedUtil.sizeOfMap(families.length) * 2;
-
- this.stats = stats;
this.indexes = indexes == null ? Collections.<PTable>emptyList() : indexes;
for (PTable index : this.indexes) {
estimatedSize += index.getEstimatedSize();
@@ -693,8 +734,8 @@ public class PTableImpl implements PTable {
}
@Override
- public PTableStats getTableStats() {
- return stats;
+ public List<byte[]> getGuidePosts() {
+ return guidePosts;
}
@Override
@@ -851,14 +892,15 @@ public class PTableImpl implements PTable {
for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
indexes.add(createFromProto(curPTableProto));
}
+
boolean isImmutableRows = table.getIsImmutableRows();
- Map<String, byte[][]> guidePosts = new HashMap<String, byte[][]>();
+ TreeMap<byte[], List<byte[]>> tableGuidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) {
- byte[][] value = new byte[pTableStatsProto.getValuesCount()][];
- for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) {
- value[j] = pTableStatsProto.getValues(j).toByteArray();
- }
- guidePosts.put(pTableStatsProto.getKey(), value);
+ List<byte[]> value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount());
+ for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) {
+ value.add(pTableStatsProto.getValues(j).toByteArray());
+ }
+ tableGuidePosts.put(pTableStatsProto.getKeyBytes().toByteArray(), value);
}
PName dataTableName = null;
if (table.hasDataTableNameBytes()) {
@@ -886,7 +928,7 @@ public class PTableImpl implements PTable {
}
}
- PTableStats stats = new PTableStatsImpl(guidePosts);
+ PTableStats stats = new PTableStatsImpl(tableGuidePosts);
try {
PTableImpl result = new PTableImpl();
result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
@@ -937,24 +979,37 @@ public class PTableImpl implements PTable {
PColumn column = columns.get(i);
builder.addColumns(PColumnImpl.toProto(column));
}
+
List<PTable> indexes = table.getIndexes();
for (PTable curIndex : indexes) {
builder.addIndexes(toProto(curIndex));
}
builder.setIsImmutableRows(table.isImmutableRows());
- // build stats
- Map<String, byte[][]> statsMap = table.getTableStats().getGuidePosts();
- if(statsMap != null) {
- for (Entry<String, byte[][]> entry : statsMap.entrySet()) {
- PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
- statsBuilder.setKey(entry.getKey());
- for (byte[] curVal : entry.getValue()) {
- statsBuilder.addValues(HBaseZeroCopyByteString.wrap(curVal));
- }
- builder.addGuidePosts(statsBuilder.build());
- }
- }
+ // build stats for the table
+ if (table.getColumnFamilies().isEmpty() && !table.getGuidePosts().isEmpty()) {
+ List<byte[]> stats = table.getGuidePosts();
+ if (stats != null) {
+ PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
+ statsBuilder.setKey(Bytes.toString(SchemaUtil.getEmptyColumnFamily(table)));
+ for (byte[] stat : stats) {
+ statsBuilder.addValues(HBaseZeroCopyByteString.wrap(stat));
+ }
+ builder.addGuidePosts(statsBuilder.build());
+ }
+ } else {
+ for (PColumnFamily fam : table.getColumnFamilies()) {
+ PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
+ if (fam.getGuidePosts() != null) {
+ statsBuilder.setKey(fam.getName().getString());
+ for (byte[] stat : fam.getGuidePosts()) {
+ statsBuilder.addValues(HBaseZeroCopyByteString.wrap(stat));
+ }
+ builder.addGuidePosts(statsBuilder.build());
+ }
+ }
+ }
+
if (table.getParentName() != null) {
builder.setDataTableNameBytes(HBaseZeroCopyByteString.wrap(table.getParentTableName().getBytes()));
}
@@ -980,4 +1035,5 @@ public class PTableImpl implements PTable {
public PTableKey getKey() {
return key;
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
index 5d867ef..dfda457 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
@@ -17,28 +17,19 @@
*/
package org.apache.phoenix.schema.stat;
-import java.util.Map;
+import java.util.List;
+import java.util.TreeMap;
-import org.apache.hadoop.hbase.HRegionInfo;
-
-/**
- * Interface for Phoenix table statistics. Statistics is collected on the server
- * side and can be used for various purpose like splitting region for scanning, etc.
- *
- * The table is defined on the client side, but it is populated on the server side. The client
- * should not populate any data to the statistics object.
+/*
+ * The table is defined on the client side, but it is populated on the server side. The client should not populate any data to the
+ * statistics object.
*/
public interface PTableStats {
-
/**
- * Given the region info, returns an array of bytes that is the current estimate of key
- * distribution inside that region. The keys should split that region into equal chunks.
- *
- * @param region
- * @return array of keys
+ * Returns a tree map of the guide posts collected against a column family
+ * @return
*/
- byte[][] getRegionGuidePosts(HRegionInfo region);
+ TreeMap<byte[], List<byte[]>> getGuidePosts();
- Map<String, byte[][]> getGuidePosts();
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
index 8690a17..3e8f1e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
@@ -16,40 +16,31 @@
* limitations under the License.
*/
package org.apache.phoenix.schema.stat;
+ import java.util.List;
+import java.util.TreeMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-
-import com.google.common.collect.ImmutableMap;
-
-
-/**
+import org.apache.hadoop.hbase.util.Bytes;
+
+ /**
* Implementation for PTableStats.
*/
public class PTableStatsImpl implements PTableStats {
- // The map for guide posts should be immutable. We only take the current snapshot from outside
- // method call and store it.
- private Map<String, byte[][]> regionGuidePosts;
+ public static final PTableStats NO_STATS = new PTableStatsImpl();
- public PTableStatsImpl() { }
+ private TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
- public PTableStatsImpl(Map<String, byte[][]> stats) {
- regionGuidePosts = ImmutableMap.copyOf(stats);
+ public PTableStatsImpl() {
+ this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR));
}
- @Override
- public byte[][] getRegionGuidePosts(HRegionInfo region) {
- return regionGuidePosts.get(region.getRegionNameAsString());
+ public PTableStatsImpl(TreeMap<byte[], List<byte[]>> guidePosts) {
+ this.guidePosts = guidePosts;
}
@Override
- public Map<String, byte[][]> getGuidePosts(){
- if(regionGuidePosts != null) {
- return ImmutableMap.copyOf(regionGuidePosts);
- }
-
- return null;
+ public TreeMap<byte[], List<byte[]>> getGuidePosts() {
+ return guidePosts;
}
+
}