You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/14 07:37:33 UTC
[17/21] phoenix git commit: PHOENIX-4579 Add a config to
conditionally create Phoenix meta tables on first client connection (Chinmay
Kulkarni)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 5cb14d6..fa5d7e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -170,6 +170,7 @@ import org.apache.phoenix.exception.RetriableUpgradeException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.UpgradeInProgressException;
+import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.exception.UpgradeNotRequiredException;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
@@ -1041,23 +1042,69 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
/**
*
- * @param tableName
+ * @param physicalTableName
+ * @param tableType
+ * @param props
+ * @param families
* @param splits
- * @param modifyExistingMetaData TODO
+ * @param modifyExistingMetaData
+ * @param isNamespaceMapped
+ * @param isDoNotUpgradePropSet
* @return true if table was created and false if it already exists
* @throws SQLException
*/
private HTableDescriptor ensureTableCreated(byte[] physicalTableName, PTableType tableType, Map<String, Object> props,
List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean modifyExistingMetaData,
- boolean isNamespaceMapped) throws SQLException {
+ boolean isNamespaceMapped, boolean isDoNotUpgradePropSet) throws SQLException {
SQLException sqlE = null;
HTableDescriptor existingDesc = null;
boolean isMetaTable = SchemaUtil.isMetaTable(physicalTableName);
boolean tableExist = true;
try (HBaseAdmin admin = getAdmin()) {
final String quorum = ZKConfig.getZKQuorumServersString(config);
- final String znode = this.props.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ final String znode = this.getProps().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
logger.debug("Found quorum: " + quorum + ":" + znode);
+
+ if (isMetaTable) {
+ if(SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) {
+ try {
+ // SYSTEM namespace needs to be created via HBase APIs because "CREATE SCHEMA" statement tries to write
+ // its metadata in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created
+ ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
+ } catch (PhoenixIOException e) {
+ // We could either:
+ // 1) Not access the NS descriptor. The NS may or may not exist at this point
+ // 2) We could not create the NS
+ // Regardless of the case 1 or 2, if we eventually try to migrate SYSTEM tables to the SYSTEM
+ // namespace using the {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method,
+ // if the NS does not exist, we will error as expected, or
+ // if the NS does exist and tables are already mapped, the check will exit gracefully
+ }
+ if (admin.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, false))) {
+ // SYSTEM.CATALOG exists, so at this point, we have 3 cases:
+ // 1) If server-side namespace mapping is disabled, throw Inconsistent namespace mapping exception
+ // 2) If server-side namespace mapping is enabled and SYSCAT needs to be upgraded, upgrade SYSCAT
+ // and also migrate SYSTEM tables to the SYSTEM namespace
+ // 3. If server-side namespace mapping is enabled and SYSCAT doesn't need to be upgraded, we still
+ // need to migrate SYSTEM tables to the SYSTEM namespace using the
+ // {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method (as part of
+ // {@link upgradeSystemTables(String, Properties)})
+ checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
+ // Thrown so we can force an upgrade which will just migrate SYSTEM tables to the SYSTEM namespace
+ throw new UpgradeRequiredException(MIN_SYSTEM_TABLE_TIMESTAMP);
+ }
+ } else if (admin.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true))) {
+ // If SYSTEM:CATALOG exists, but client-side namespace mapping for SYSTEM tables is disabled, throw an exception
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
+ .setMessage("Cannot initiate connection as "
+ + SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true)
+ + " is found but client does not have "
+ + IS_NAMESPACE_MAPPING_ENABLED + " enabled")
+ .build().buildException();
+ }
+ }
+
try {
existingDesc = admin.getTableDescriptor(physicalTableName);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -1075,6 +1122,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
splits, isNamespaceMapped);
if (!tableExist) {
+ if (isMetaTable && !isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) {
+ // Disallow creating the SYSTEM.CATALOG or SYSTEM:CATALOG HBase table
+ throw new UpgradeRequiredException();
+ }
if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals(
PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
@@ -1092,9 +1143,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (TableExistsException e) {
// We can ignore this, as it just means that another client beat us
// to creating the HBase metadata.
+ if (isMetaTable && !isUpgradeRequired()) {
+ checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
+ }
return null;
}
- if (isMetaTable) {
+ if (isMetaTable && !isUpgradeRequired()) {
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
/*
* Now we modify the table to add the split policy, since we know that the client and
@@ -1106,7 +1160,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
return null;
} else {
- if (isMetaTable) {
+ if (isMetaTable && !isUpgradeRequired()) {
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
} else {
for(Pair<byte[],Map<String,Object>> family: families) {
@@ -1120,7 +1174,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
-
if (!modifyExistingMetaData) {
return existingDesc; // Caller already knows that no metadata was changed
}
@@ -1143,7 +1196,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return null; // Indicate that no metadata was changed
}
- modifyTable(physicalTableName, newDesc, true);
+ // Do not call modifyTable for SYSTEM tables
+ if (tableType != PTableType.SYSTEM) {
+ modifyTable(physicalTableName, newDesc, true);
+ }
return newDesc;
}
@@ -1198,6 +1254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
boolean isIncompatible = false;
int minHBaseVersion = Integer.MAX_VALUE;
boolean isTableNamespaceMappingEnabled = false;
+ long systemCatalogTimestamp = Long.MAX_VALUE;
HTableInterface ht = null;
try {
List<HRegionLocation> locations = this
@@ -1214,36 +1271,44 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
ht = this.getTable(metaTable);
- final Map<byte[], Long> results =
- ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,Long>() {
+ final Map<byte[], GetVersionResponse> results =
+ ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,GetVersionResponse>() {
@Override
- public Long call(MetaDataService instance) throws IOException {
+ public GetVersionResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GetVersionResponse> rpcCallback =
- new BlockingRpcCallback<GetVersionResponse>();
+ new BlockingRpcCallback<>();
GetVersionRequest.Builder builder = GetVersionRequest.newBuilder();
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getVersion(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
- return rpcCallback.get().getVersion();
+ return rpcCallback.get();
}
});
- for (Map.Entry<byte[],Long> result : results.entrySet()) {
+ for (Map.Entry<byte[],GetVersionResponse> result : results.entrySet()) {
// This is the "phoenix.jar" is in-place, but server is out-of-sync with client case.
- long version = result.getValue();
- isTableNamespaceMappingEnabled |= MetaDataUtil.decodeTableNamespaceMappingEnabled(version);
+ GetVersionResponse versionResponse = result.getValue();
+ long serverJarVersion = versionResponse.getVersion();
+ isTableNamespaceMappingEnabled |= MetaDataUtil.decodeTableNamespaceMappingEnabled(serverJarVersion);
- if (!isCompatible(result.getValue())) {
+ if (!isCompatible(serverJarVersion)) {
isIncompatible = true;
HRegionLocation name = regionMap.get(result.getKey());
buf.append(name);
buf.append(';');
}
- hasIndexWALCodec &= hasIndexWALCodec(result.getValue());
- if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) {
- minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue());
+ hasIndexWALCodec &= hasIndexWALCodec(serverJarVersion);
+ if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(serverJarVersion)) {
+ minHBaseVersion = MetaDataUtil.decodeHBaseVersion(serverJarVersion);
+ }
+ // In case this is the first time connecting to this cluster, the system catalog table does not have an
+ // entry for itself yet, so we cannot get the timestamp and this will not be returned from the
+ // GetVersionResponse message object
+ if (versionResponse.hasSystemCatalogTimestamp()) {
+ systemCatalogTimestamp = systemCatalogTimestamp < versionResponse.getSystemCatalogTimestamp() ?
+ systemCatalogTimestamp: versionResponse.getSystemCatalogTimestamp();
}
}
if (isTableNamespaceMappingEnabled != SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE,
@@ -1274,6 +1339,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
buf.setLength(buf.length()-1);
throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException();
}
+ if (systemCatalogTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
+ throw new UpgradeRequiredException(systemCatalogTimestamp);
+ }
}
/**
@@ -1334,14 +1402,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING);
HTableDescriptor desc = ensureTableCreated(physicalIndexName, PTableType.TABLE, tableProps, families, splits,
- false, isNamespaceMapped);
+ false, isNamespaceMapped, false);
if (desc != null) {
if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) {
String fullTableName = Bytes.toString(physicalIndexName);
throw new TableAlreadyExistsException(
- "Unable to create shared physical table for indexes on views.",
- SchemaUtil.getSchemaNameFromFullName(fullTableName),
- SchemaUtil.getTableNameFromFullName(fullTableName));
+ SchemaUtil.getSchemaNameFromFullName(fullTableName),
+ SchemaUtil.getTableNameFromFullName(fullTableName),
+ "Unable to create shared physical table for indexes on views.");
}
}
}
@@ -1409,8 +1477,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, final byte[] physicalTableName,
PTableType tableType, Map<String, Object> tableProps,
- final List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped, final boolean allocateIndexId)
- throws SQLException {
+ final List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped,
+ final boolean allocateIndexId, final boolean isDoNotUpgradePropSet) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData);
byte[] key = m.getRow();
@@ -1430,7 +1498,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
- ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped);
+ ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped, isDoNotUpgradePropSet);
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (tableType == PTableType.INDEX) { // Index on view
@@ -2436,30 +2504,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
openConnection();
hConnectionEstablished = true;
boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);
- try (HBaseAdmin admin = getAdmin()) {
- boolean mappedSystemCatalogExists = admin
- .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true));
- if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
- ConnectionQueryServicesImpl.this.getProps())) {
- if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) {
- //check if the server is already updated and have namespace config properly set.
- checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
- }
-
- // If SYSTEM tables exist, they are migrated to HBase SYSTEM namespace
- // If they don't exist, this method will create HBase SYSTEM namespace and return
- ensureSystemTablesMigratedToSystemNamespace(ConnectionQueryServicesImpl.this.getProps());
- } else if (mappedSystemCatalogExists) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
- .setMessage("Cannot initiate connection as "
- + SchemaUtil.getPhysicalTableName(
- SYSTEM_CATALOG_NAME_BYTES, true)
- + " is found but client does not have "
- + IS_NAMESPACE_MAPPING_ENABLED + " enabled")
- .build().buildException();
- }
- }
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(getSystemTableVersion()));
@@ -2508,38 +2552,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
initializationException = e;
}
return null;
+ } catch (UpgradeRequiredException e) {
+ // This will occur in 3 cases:
+ // 1. SYSTEM.CATALOG does not exist and we don't want to allow the user to create it i.e.
+ // !isAutoUpgradeEnabled or isDoNotUpgradePropSet is set
+ // 2. SYSTEM.CATALOG exists and its timestamp < MIN_SYSTEM_TABLE_TIMESTAMP
+ // 3. SYSTEM.CATALOG exists, but client and server-side namespace mapping is enabled so
+ // we need to migrate SYSTEM tables to the SYSTEM namespace
+ setUpgradeRequired();
}
- // HBase Namespace SYSTEM is created by {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method
- // This statement will create its entry in SYSCAT table, so that GRANT/REVOKE commands can work
- // with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227)
- if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
- ConnectionQueryServicesImpl.this.getProps())) {
- try {
- metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
- + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
- } catch (NewerSchemaAlreadyExistsException e) {
- // Older clients with appropriate perms may try getting a new connection
- // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here
- } catch (PhoenixIOException e) {
- if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
- // Ignore ADE
- } else {
- throw e;
- }
- }
- }
if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
createOtherSystemTables(metaConnection, hBaseAdmin);
+ // In case namespace mapping is enabled and system table to system namespace mapping is also enabled,
+ // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work
+ // with SYSTEM Namespace
+ createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection);
} else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) {
+ // Upgrade is required and we are allowed to automatically upgrade
upgradeSystemTables(url, props);
+ } else {
+ // We expect the user to manually run the "EXECUTE UPGRADE" command first.
+ // This exception will get caught below as a RetriableUpgradeException
+ throw new UpgradeRequiredException();
}
}
scheduleRenewLeaseTasks();
success = true;
} catch (RetriableUpgradeException e) {
- // Don't set it as initializationException because otherwise the clien't won't be able
- // to retry establishing connection.
+ // Set success to true and don't set the exception as an initializationException,
+ // because otherwise the client won't be able to retry establishing the connection.
+ success = true;
throw e;
} catch (Exception e) {
if (e instanceof SQLException) {
@@ -2580,7 +2623,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- void createSysMutexTableIfNotExists(HBaseAdmin admin, ReadOnlyProps props) throws IOException, SQLException {
+ void createSysMutexTableIfNotExists(HBaseAdmin admin) throws IOException, SQLException {
try {
if(admin.tableExists(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME) || admin.tableExists(TableName.valueOf(
PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME))) {
@@ -2588,7 +2631,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return;
}
final TableName mutexTableName = SchemaUtil.getPhysicalTableName(
- PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props);
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, this.getProps());
HTableDescriptor tableDesc = new HTableDescriptor(mutexTableName);
HColumnDescriptor columnDesc = new HColumnDescriptor(
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
@@ -2640,7 +2683,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (TableAlreadyExistsException ignore) {}
// Catch the IOException to log the error message and then bubble it up for the client to retry.
try {
- createSysMutexTableIfNotExists(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps());
+ createSysMutexTableIfNotExists(hbaseAdmin);
} catch (IOException exception) {
logger.error("Failed to created SYSMUTEX table. Upgrade or migration is not possible without it. Please retry.");
throw exception;
@@ -2648,6 +2691,265 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
/**
+ * Create an entry for the SYSTEM namespace in the SYSCAT table in case namespace mapping is enabled and system table
+ * to system namespace mapping is also enabled. If not enabled, this method returns immediately without doing anything
+ * @param metaConnection
+ * @throws SQLException
+ */
+ private void createSchemaIfNotExistsSystemNSMappingEnabled(PhoenixConnection metaConnection) throws SQLException {
+ // HBase Namespace SYSTEM is assumed to be already created inside {@link ensureTableCreated(byte[], PTableType,
+ // Map<String, Object>, List<Pair<byte[], Map<String, Object>>>, byte[][], boolean, boolean, boolean)}.
+ // This statement will create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE
+ // commands can work with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227)
+ if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
+ ConnectionQueryServicesImpl.this.getProps())) {
+ try {
+ metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+ } catch (NewerSchemaAlreadyExistsException e) {
+ // Older clients with appropriate perms may try getting a new connection
+ // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here
+ } catch (PhoenixIOException e) {
+ if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
+ // Ignore ADE
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Upgrade the SYSCAT schema if required
+ * @param metaConnection
+ * @param currentServerSideTableTimeStamp
+ * @return Phoenix connection object
+ * @throws SQLException
+ * @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ // Available for testing
+ protected PhoenixConnection upgradeSystemCatalogIfRequired(PhoenixConnection metaConnection,
+ long currentServerSideTableTimeStamp) throws SQLException, IOException, TimeoutException, InterruptedException {
+ String columnsToAdd = "";
+ // This will occur if we have an older SYSTEM.CATALOG and we need to update it to
+ // include any new columns we've added.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+ // We know that we always need to add the STORE_NULLS column for 4.3 release
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS
+ + " " + PBoolean.INSTANCE.getSqlTypeName());
+ try (HBaseAdmin admin = getAdmin()) {
+ HTableDescriptor[] localIndexTables = admin
+ .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*");
+ for (HTableDescriptor table : localIndexTables) {
+ if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
+ && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
+ table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
+ MetaDataUtil.getLocalIndexUserTableName(table.getNameAsString()));
+ // Explicitly disable, modify and enable the table to ensure
+ // co-location of data and index regions. If we just modify the
+ // table descriptor when online schema change enabled may reopen
+ // the region in same region server instead of following data region.
+ admin.disableTable(table.getTableName());
+ admin.modifyTable(table.getTableName(), table);
+ admin.enableTable(table.getTableName());
+ }
+ }
+ }
+ }
+
+ // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
+ // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
+ // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+ // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+ // the column names that have been added to SYSTEM.CATALOG since 4.0.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " "
+ + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " "
+ + PLong.INSTANCE.getSqlTypeName());
+ }
+
+ // If we have some new columns from 4.1-4.3 to add, add them now.
+ if (!columnsToAdd.isEmpty()) {
+ // Ugh..need to assign to another local variable to keep eclipse happy.
+ PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
+ metaConnection = newMetaConnection;
+ }
+
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+ + PInteger.INSTANCE.getSqlTypeName();
+ try {
+ metaConnection = addColumn(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd,
+ false);
+ upgradeTo4_5_0(metaConnection);
+ } catch (ColumnAlreadyExistsException ignored) {
+ /*
+ * Upgrade to 4.5 is a slightly special case. We use the fact that the
+ * column BASE_COLUMN_COUNT is already part of the meta-data schema as the
+ * signal that the server side upgrade has finished or is in progress.
+ */
+ logger.debug("No need to run 4.5 upgrade");
+ }
+ Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+ p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ p.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ PhoenixConnection conn = new PhoenixConnection(
+ ConnectionQueryServicesImpl.this, metaConnection.getURL(), p,
+ metaConnection.getMetaDataCache());
+ try {
+ List<String> tablesNeedingUpgrade = UpgradeUtil
+ .getPhysicalTablesWithDescRowKey(conn);
+ if (!tablesNeedingUpgrade.isEmpty()) {
+ logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n"
+ + Joiner.on(' ').join(tablesNeedingUpgrade)
+ + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
+ }
+ List<String> unsupportedTables = UpgradeUtil
+ .getPhysicalTablesWithDescVarbinaryRowKey(conn);
+ if (!unsupportedTables.isEmpty()) {
+ logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n"
+ + Joiner.on(' ').join(unsupportedTables));
+ }
+ } catch (Exception ex) {
+ logger.error(
+ "Unable to determine tables requiring upgrade due to PHOENIX-2067",
+ ex);
+ } finally {
+ conn.close();
+ }
+ }
+ // Add these columns one at a time, each with different timestamps so that if folks
+ // have
+ // run the upgrade code already for a snapshot, we'll still enter this block (and do
+ // the
+ // parts we haven't yet done).
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " "
+ + PBoolean.INSTANCE.getSqlTypeName();
+ metaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+ // Drop old stats table so that new stats table is created
+ metaConnection = dropStatsTable(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
+ PhoenixDatabaseMetaData.TRANSACTIONAL + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
+ PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " "
+ + PLong.INSTANCE.getSqlTypeName());
+ metaConnection = setImmutableTableIndexesImmutable(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
+ metaConnection = updateSystemCatalogTimestamp(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+ clearCache();
+ }
+
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
+ PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
+ PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+ + PVarchar.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
+ PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
+ if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
+ QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
+ metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
+ }
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
+ clearCache();
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0,
+ PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + " "
+ + PLong.INSTANCE.getSqlTypeName());
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0);
+ clearCache();
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) {
+ metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3);
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2,
+ PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1,
+ PhoenixDatabaseMetaData.ENCODING_SCHEME + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0,
+ PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " "
+ + PInteger.INSTANCE.getSqlTypeName());
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0);
+ clearCache();
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0,
+ PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ addParentToChildLinks(metaConnection);
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0,
+ PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ }
+ return metaConnection;
+ }
+
+ /**
* There is no other locking needed here since only one connection (on the same or different JVM) will be able to
* acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}.
*/
@@ -2674,244 +2976,54 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
scnProps, newEmptyMetaData());
metaConnection.setRunningUpgrade(true);
+ // Always try to create SYSTEM.MUTEX table since we need it to acquire the upgrade mutex.
+ // Upgrade or migration is not possible without the upgrade mutex
+ try (HBaseAdmin admin = getAdmin()) {
+ createSysMutexTableIfNotExists(admin);
+ }
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
// timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
// *after* this fixed timestamp.
+ } catch (UpgradeRequiredException e) {
+ // This is thrown while trying to create SYSTEM:CATALOG to indicate that we must migrate SYSTEM tables
+ // to the SYSTEM namespace and/or upgrade SYSCAT if required
+ sysCatalogTableName = SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getNameAsString();
+ if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, ConnectionQueryServicesImpl.this.getProps())) {
+ // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table.
+ if (acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey)) {
+ logger.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace "
+ + "and/or upgrading " + sysCatalogTableName);
+ }
+ // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
+
+ // If SYSTEM tables exist, they are migrated to HBase SYSTEM namespace
+ // If they don't exist or they're already migrated, this method will return immediately
+ ensureSystemTablesMigratedToSystemNamespace();
+ logger.debug("Migrated SYSTEM tables to SYSTEM namespace");
+ metaConnection = upgradeSystemCatalogIfRequired(metaConnection, e.getSystemCatalogTimeStamp());
+ }
} catch (TableAlreadyExistsException e) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
sysCatalogTableName = e.getTable().getPhysicalName().getString();
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
- if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) {
- try (HBaseAdmin admin = getAdmin()) {
- createSysMutexTableIfNotExists(admin, this.getProps());
- }
- }
+ // Try acquiring a lock in SYSMUTEX table before upgrading SYSCAT. If we cannot acquire the lock,
+ // it means some old client is either migrating SYSTEM tables or trying to upgrade the schema of
+ // SYSCAT table and hence it should not be interrupted
if (acquiredMutexLock = acquireUpgradeMutex(currentServerSideTableTimeStamp, mutexRowKey)) {
+ logger.debug("Acquired lock in SYSMUTEX table for upgrading " + sysCatalogTableName);
snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
createSnapshot(snapshotName, sysCatalogTableName);
snapshotCreated = true;
+ logger.debug("Created snapshot for SYSCAT");
}
+ // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
}
- String columnsToAdd = "";
- // This will occur if we have an older SYSTEM.CATALOG and we need to update it to
- // include any new columns we've added.
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
- // We know that we always need to add the STORE_NULLS column for 4.3 release
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS
- + " " + PBoolean.INSTANCE.getSqlTypeName());
- try (HBaseAdmin admin = getAdmin()) {
- HTableDescriptor[] localIndexTables = admin
- .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*");
- for (HTableDescriptor table : localIndexTables) {
- if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
- && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
- table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
- MetaDataUtil.getLocalIndexUserTableName(table.getNameAsString()));
- // Explicitly disable, modify and enable the table to ensure
- // co-location of data and index regions. If we just modify the
- // table descriptor when online schema change enabled may reopen
- // the region in same region server instead of following data region.
- admin.disableTable(table.getTableName());
- admin.modifyTable(table.getTableName(), table);
- admin.enableTable(table.getTableName());
- }
- }
- }
- }
-
- // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
- // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
- // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
- // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
- // the column names that have been added to SYSTEM.CATALOG since 4.0.
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " "
- + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
- + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " "
- + PLong.INSTANCE.getSqlTypeName());
- }
-
- // If we have some new columns from 4.1-4.3 to add, add them now.
- if (!columnsToAdd.isEmpty()) {
- // Ugh..need to assign to another local variable to keep eclipse happy.
- PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
- metaConnection = newMetaConnection;
- }
-
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
- columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
- + PInteger.INSTANCE.getSqlTypeName();
- try {
- metaConnection = addColumn(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd,
- false);
- upgradeTo4_5_0(metaConnection);
- } catch (ColumnAlreadyExistsException ignored) {
- /*
- * Upgrade to 4.5 is a slightly special case. We use the fact that the
- * column BASE_COLUMN_COUNT is already part of the meta-data schema as the
- * signal that the server side upgrade has finished or is in progress.
- */
- logger.debug("No need to run 4.5 upgrade");
- }
- Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
- p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
- p.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- PhoenixConnection conn = new PhoenixConnection(
- ConnectionQueryServicesImpl.this, metaConnection.getURL(), p,
- metaConnection.getMetaDataCache());
- try {
- List<String> tablesNeedingUpgrade = UpgradeUtil
- .getPhysicalTablesWithDescRowKey(conn);
- if (!tablesNeedingUpgrade.isEmpty()) {
- logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n"
- + Joiner.on(' ').join(tablesNeedingUpgrade)
- + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
- }
- List<String> unsupportedTables = UpgradeUtil
- .getPhysicalTablesWithDescVarbinaryRowKey(conn);
- if (!unsupportedTables.isEmpty()) {
- logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n"
- + Joiner.on(' ').join(unsupportedTables));
- }
- } catch (Exception ex) {
- logger.error(
- "Unable to determine tables requiring upgrade due to PHOENIX-2067",
- ex);
- } finally {
- conn.close();
- }
- }
- // Add these columns one at a time, each with different timestamps so that if folks
- // have
- // run the upgrade code already for a snapshot, we'll still enter this block (and do
- // the
- // parts we haven't yet done).
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
- columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " "
- + PBoolean.INSTANCE.getSqlTypeName();
- metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
- }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
- // Drop old stats table so that new stats table is created
- metaConnection = dropStatsTable(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
- PhoenixDatabaseMetaData.TRANSACTIONAL + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
- PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " "
- + PLong.INSTANCE.getSqlTypeName());
- metaConnection = setImmutableTableIndexesImmutable(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
- metaConnection = updateSystemCatalogTimestamp(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- clearCache();
- }
-
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
- PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
- PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
- + PVarchar.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
- PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
- if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
- QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
- metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
- }
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
- clearCache();
- }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0) {
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0,
- PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + " "
- + PLong.INSTANCE.getSqlTypeName());
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0);
- clearCache();
- }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) {
- metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3);
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2,
- PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " "
- + PTinyint.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1,
- PhoenixDatabaseMetaData.ENCODING_SCHEME + " "
- + PTinyint.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0,
- PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " "
- + PInteger.INSTANCE.getSqlTypeName());
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0);
- clearCache();
- }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0,
- PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- addParentToChildLinks(metaConnection);
- }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) {
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0,
- PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " "
- + PTinyint.INSTANCE.getSqlTypeName());
- }
+ metaConnection = upgradeSystemCatalogIfRequired(metaConnection, currentServerSideTableTimeStamp);
}
-
int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
@@ -2997,6 +3109,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA);
} catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+
+ // In case namespace mapping is enabled and system table to system namespace mapping is also enabled,
+ // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work
+ // with SYSTEM Namespace
+ createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection);
+
ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
success = true;
} catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
@@ -3218,28 +3336,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- void ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps props)
+ void ensureSystemTablesMigratedToSystemNamespace()
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
- if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
-
- boolean acquiredMutexLock = false;
- byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+ if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) { return; }
HTableInterface metatable = null;
try (HBaseAdmin admin = getAdmin()) {
- // SYSTEM namespace needs to be created via HBase API's because "CREATE SCHEMA" statement tries to write its metadata
- // in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created.
- try {
- ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
- } catch (PhoenixIOException e) {
- // We could either:
- // 1) Not access the NS descriptor. The NS may or may not exist at this point.
- // 2) We could not create the NS
- // Regardless of the case 1 or 2, if the NS does not exist, we will error expectedly
- // below. If the NS does exist and is mapped, the below check will exit gracefully.
- }
-
List<TableName> tableNames = getSystemTableNamesInDefaultNamespace(admin);
// No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*"
if (tableNames.size() == 0) { return; }
@@ -3248,33 +3350,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
logger.warn("Expected 5 system tables but found " + tableNames.size() + ":" + tableNames);
}
- // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table
- // If we cannot acquire lock, it means some old client is either migrating SYSCAT or trying to upgrade the
- // schema of SYSCAT table and hence it should not be interrupted
- // Create mutex if not already created
- createSysMutexTableIfNotExists(admin, props);
- acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey);
- if(acquiredMutexLock) {
- logger.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace");
- }
- // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
-
// Handle the upgrade of SYSMUTEX table separately since it doesn't have any entries in SYSCAT
logger.info("Migrating SYSTEM.MUTEX table to SYSTEM namespace.");
String sysMutexSrcTableName = PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
- String sysMutexDestTableName = SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), props).getNameAsString();
+ String sysMutexDestTableName = SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), this.getProps()).getNameAsString();
UpgradeUtil.mapTableToNamespace(admin, sysMutexSrcTableName, sysMutexDestTableName, PTableType.SYSTEM);
tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME);
byte[] mappedSystemTable = SchemaUtil
- .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
+ .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName();
metatable = getTable(mappedSystemTable);
if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)) {
if (!admin.tableExists(mappedSystemTable)) {
logger.info("Migrating SYSTEM.CATALOG table to SYSTEM namespace.");
// Actual migration of SYSCAT table
UpgradeUtil.mapTableToNamespace(admin, metatable,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, this.getProps(), null, PTableType.SYSTEM,
null);
// Invalidate the client-side metadataCache
ConnectionQueryServicesImpl.this.removeTable(null,
@@ -3285,7 +3376,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
for (TableName table : tableNames) {
logger.info(String.format("Migrating %s table to SYSTEM namespace.", table.getNameAsString()));
- UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), props, null, PTableType.SYSTEM,
+ UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), this.getProps(), null, PTableType.SYSTEM,
null);
ConnectionQueryServicesImpl.this.removeTable(null, table.getNameAsString(), null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
@@ -3297,9 +3388,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (metatable != null) {
metatable.close();
}
- if(acquiredMutexLock) {
- releaseUpgradeMutex(mutexRowKey);
- }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index aa8209d..14abd63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -242,7 +242,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType,
Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits,
- boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException {
+ boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet) throws SQLException {
if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) {
Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME);
List<HRegionLocation> regionLocations = tableSplits.get(dataTableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index ed9b9da..0d0df37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -114,9 +114,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
@Override
public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType,
Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits,
- boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException {
+ boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet) throws SQLException {
return getDelegate().createTable(tableMetaData, physicalName, tableType, tableProps, families, splits,
- isNamespaceMapped, allocateIndexId);
+ isNamespaceMapped, allocateIndexId, isDoNotUpgradePropSet);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 1fb668e..b15072a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2694,7 +2694,8 @@ public class MetaDataClient {
MetaDataMutationResult result = connection.getQueryServices().createTable(
tableMetaData,
viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes() : null,
- tableType, tableProps, familyPropList, splits, isNamespaceMapped, allocateIndexId);
+ tableType, tableProps, familyPropList, splits, isNamespaceMapped, allocateIndexId,
+ UpgradeUtil.isNoUpgradeSet(connection.getClientInfo()));
MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index b5c3e4a..46907d9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -47,9 +47,9 @@ public class ConnectionQueryServicesImplTest {
ConnectionQueryServicesImpl cqs = mock(ConnectionQueryServicesImpl.class);
// Invoke the real methods for these two calls
when(cqs.createSchema(any(List.class), anyString())).thenCallRealMethod();
- doCallRealMethod().when(cqs).ensureSystemTablesMigratedToSystemNamespace(any(ReadOnlyProps.class));
+ doCallRealMethod().when(cqs).ensureSystemTablesMigratedToSystemNamespace();
// Do nothing for this method, just check that it was invoked later
- doNothing().when(cqs).createSysMutexTableIfNotExists(any(HBaseAdmin.class), any(ReadOnlyProps.class));
+ doNothing().when(cqs).createSysMutexTableIfNotExists(any(HBaseAdmin.class));
// Spoof out this call so that ensureSystemTablesUpgrade() will return-fast.
when(cqs.getSystemTableNamesInDefaultNamespace(any(HBaseAdmin.class))).thenReturn(Collections.<TableName> emptyList());
@@ -60,7 +60,8 @@ public class ConnectionQueryServicesImplTest {
// Make sure that ensureSystemTablesMigratedToSystemNamespace will try to migrate the system tables.
Map<String,String> props = new HashMap<>();
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
- cqs.ensureSystemTablesMigratedToSystemNamespace(new ReadOnlyProps(props));
+ when(cqs.getProps()).thenReturn(new ReadOnlyProps(props));
+ cqs.ensureSystemTablesMigratedToSystemNamespace();
// Should be called after upgradeSystemTables()
// Proves that execution proceeded
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cda8141/phoenix-protocol/src/main/MetaDataService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index 2ba2b4c..369522c 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -168,6 +168,7 @@ message GetVersionRequest {
message GetVersionResponse {
required int64 version = 1;
+ optional int64 systemCatalogTimestamp = 2;
}
message ClearTableFromCacheRequest {