You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/20 18:06:06 UTC
[43/47] phoenix git commit: PHOENIX-3174 Make minor upgrade a manual
step
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1aa9b88..dfe7ee8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -63,6 +63,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
@@ -131,6 +132,8 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateR
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.exception.UpgradeInProgressException;
+import org.apache.phoenix.exception.UpgradeNotRequiredException;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.hbase.index.Indexer;
@@ -266,6 +269,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
private ScheduledExecutorService renewLeaseExecutor;
private final boolean renewLeaseEnabled;
+ private final boolean isAutoUpgradeEnabled;
+ private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
@@ -342,6 +347,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
connectionQueues = ImmutableList.copyOf(list);
// A little bit of a smell to leak `this` here, but should not be a problem
this.tableStatsCache = new TableStatsCache(this, config);
+ this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
}
@Override
@@ -2310,29 +2316,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return null;
}
checkClosed();
- PhoenixConnection metaConnection = null;
- boolean success = false;
- String snapshotName = null;
- String sysCatalogTableName = null;
try {
openConnection();
- String noUpgradeProp = props.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
- boolean upgradeSystemTables = !Boolean.TRUE.equals(Boolean.valueOf(noUpgradeProp));
- Properties scnProps = PropertiesUtil.deepCopy(props);
- scnProps.setProperty(
- PhoenixRuntime.CURRENT_SCN_ATTRIB,
- Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
- scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
- metaConnection = new PhoenixConnection(
- ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData());
+ boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);
try (HBaseAdmin admin = getAdmin()) {
boolean mappedSystemCatalogExists = admin
.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true));
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
ConnectionQueryServicesImpl.this.getProps())) {
if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) {
- //check if the server is already updated and have namespace config properly set.
+ //check if the server is already updated and have namespace config properly set.
checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
}
ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps());
@@ -2345,232 +2338,32 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
+ IS_NAMESPACE_MAPPING_ENABLED + " enabled")
.build().buildException(); }
}
- try {
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
- // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
- // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
- } catch (TableAlreadyExistsException e) {
- if (upgradeSystemTables) {
- long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
- sysCatalogTableName = e.getTable().getPhysicalName().getString();
- if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable().getPhysicalName().getBytes())) {
- snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
- createSnapshot(snapshotName, sysCatalogTableName);
- }
- String columnsToAdd = "";
- // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
- // any new columns we've added.
- if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
- // We know that we always need to add the STORE_NULLS column for 4.3 release
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
- try (HBaseAdmin admin = getAdmin()) {
- HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
- for (HTableDescriptor table : localIndexTables) {
- if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
- && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
- table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
- MetaDataUtil.getUserTableName(table
- .getNameAsString()));
- // Explicitly disable, modify and enable the table to ensure co-location of data
- // and index regions. If we just modify the table descriptor when online schema
- // change enabled may reopen the region in same region server instead of following data region.
- admin.disableTable(table.getTableName());
- admin.modifyTable(table.getTableName(), table);
- admin.enableTable(table.getTableName());
- }
- }
- }
- }
-
- // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
- // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
- // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
- // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
- // the column names that have been added to SYSTEM.CATALOG since 4.0.
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
- + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName());
- }
-
- // If we have some new columns from 4.1-4.3 to add, add them now.
- if (!columnsToAdd.isEmpty()) {
- // Ugh..need to assign to another local variable to keep eclipse happy.
- PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
- metaConnection = newMetaConnection;
- }
-
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
- columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
- + PInteger.INSTANCE.getSqlTypeName();
- try {
- metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
- upgradeTo4_5_0(metaConnection);
- } catch (ColumnAlreadyExistsException ignored) {
- /*
- * Upgrade to 4.5 is a slightly special case. We use the fact that the column
- * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
- * the server side upgrade has finished or is in progress.
- */
- logger.debug("No need to run 4.5 upgrade");
- }
- Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
- props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
- props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
- try {
- List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
- if (!tablesNeedingUpgrade.isEmpty()) {
- logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
- }
- List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
- if (!unsupportedTables.isEmpty()) {
- logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables));
- }
- } catch (Exception ex) {
- logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
- } finally {
- conn.close();
- }
- }
- // Add these columns one at a time, each with different timestamps so that if folks have
- // run the upgrade code already for a snapshot, we'll still enter this block (and do the
- // parts we haven't yet done).
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
- columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
- }
- if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
- // Drop old stats table so that new stats table is created
- metaConnection = dropStatsTable(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
- PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
- PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName());
- metaConnection = setImmutableTableIndexesImmutable(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
- metaConnection = updateSystemCatalogTimestamp(metaConnection,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- clearCache();
- }
-
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
- metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
- PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
- PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
- + PVarchar.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
- PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
- + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
- if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
- QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
- metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
- }
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
- clearCache();
- }
-
- }
- }
-
- int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
- QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
- try {
- String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
- metaConnection.createStatement().executeUpdate(createSequenceTable);
- nSequenceSaltBuckets = nSaltBuckets;
- } catch (NewerTableAlreadyExistsException e) {
- // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
- // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
- nSequenceSaltBuckets = getSaltBuckets(e);
- } catch (TableAlreadyExistsException e) {
- if (upgradeSystemTables) {
- // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include
- // any new columns we've added.
+ Properties scnProps = PropertiesUtil.deepCopy(props);
+ scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+ scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
+ try (PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
+ scnProps, newEmptyMetaData())) {
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
+ // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
+ // *after* this fixed timestamp.
+ } catch (TableAlreadyExistsException e) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
- // If the table time stamp is before 4.1.0 then we need to add below columns
- // to the SYSTEM.SEQUENCE table.
- String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
- + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
- + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName()
- + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName();
- addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
- }
- // If the table timestamp is before 4.2.1 then run the upgrade script
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) {
- if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
- metaConnection.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA,
- PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
- PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES,
- PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
- }
- nSequenceSaltBuckets = nSaltBuckets;
- } else {
- nSequenceSaltBuckets = getSaltBuckets(e);
+ if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
+ ConnectionQueryServicesImpl.this.upgradeRequired.set(true);
}
}
- }
- try {
- metaConnection.createStatement().executeUpdate(
- QueryConstants.CREATE_STATS_TABLE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
- } catch(TableAlreadyExistsException e) {
- if (upgradeSystemTables) {
- long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
- metaConnection = addColumnsIfNotExists(
- metaConnection,
- SYSTEM_STATS_NAME,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
- PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
- + PLong.INSTANCE.getSqlTypeName());
- }
+ if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+ createOtherSystemTables(metaConnection);
+ } else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) {
+ upgradeSystemTables(url, props);
}
}
- try {
- metaConnection.createStatement().executeUpdate(
- QueryConstants.CREATE_FUNCTION_METADATA);
- } catch (NewerTableAlreadyExistsException e) {
- } catch (TableAlreadyExistsException e) {
- }
- if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
- ConnectionQueryServicesImpl.this.getProps())) {
- try {
- metaConnection.createStatement().executeUpdate("CREATE SCHEMA IF NOT EXISTS "
- + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
- } catch (NewerSchemaAlreadyExistsException e) {}
- }
- success = true;
scheduleRenewLeaseTasks();
- } catch (UpgradeInProgressException e) {
- // don't set it as initializationException because otherwise client won't be able to retry
- throw e;
} catch (Exception e) {
if (e instanceof SQLException) {
initializationException = (SQLException)e;
@@ -2579,222 +2372,517 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
initializationException = new SQLException(e);
}
} finally {
- try {
- if (metaConnection != null) metaConnection.close();
- } catch (SQLException e) {
- if (initializationException != null) {
- initializationException.setNextException(e);
- } else {
- initializationException = e;
- }
- } finally {
- try {
- restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
- } catch (SQLException e) {
- if (initializationException != null) {
- initializationException.setNextException(e);
- } else {
- initializationException = e;
- }
- }
- try {
- if (initializationException != null) {
- throw initializationException;
- }
- } finally {
- initialized = true;
- }
- }
+ initialized = true;
}
- }
+ }
return null;
}
-
- private void createSnapshot(String snapshotName, String tableName)
- throws SQLException {
- HBaseAdmin admin = null;
- SQLException sqlE = null;
- try {
- admin = getAdmin();
- admin.snapshot(snapshotName, tableName);
- logger.info("Successfully created snapshot " + snapshotName + " for "
- + tableName);
- } catch (Exception e) {
- sqlE = new SQLException(e);
- } finally {
- try {
- if (admin != null) {
- admin.close();
- }
- } catch (Exception e) {
- SQLException adminCloseEx = new SQLException(e);
- if (sqlE == null) {
- sqlE = adminCloseEx;
- } else {
- sqlE.setNextException(adminCloseEx);
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
+ });
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, SQLException.class);
+ Throwables.propagate(e);
+ }
+ }
+
+ private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQLException {
+ try {
+ metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA);
+ } catch (TableAlreadyExistsException ignore) {}
+ try {
+ metaConnection.createStatement().execute(QueryConstants.CREATE_STATS_TABLE_METADATA);
+ } catch (TableAlreadyExistsException ignore) {}
+ try {
+ metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA);
+ } catch (TableAlreadyExistsException ignore) {}
+ }
+
+ /**
+ * There is no other locking needed here since only one connection (on the same or different JVM) will be able to
+ * acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}.
+ */
+ @Override
+ public void upgradeSystemTables(final String url, final Properties props) throws SQLException {
+ PhoenixConnection metaConnection = null;
+ boolean success = false;
+ String snapshotName = null;
+ String sysCatalogTableName = null;
+ SQLException toThrow = null;
+ try {
+ if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+ throw new UpgradeNotRequiredException();
+ }
+ Properties scnProps = PropertiesUtil.deepCopy(props);
+ scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+ scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
+ metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
+ scnProps, newEmptyMetaData());
+ metaConnection.setRunningUpgrade(true);
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
+ // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
+ // *after* this fixed timestamp.
+ } catch (TableAlreadyExistsException e) {
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+ sysCatalogTableName = e.getTable().getPhysicalName().getString();
+ if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP
+ && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable()
+ .getPhysicalName().getBytes())) {
+ snapshotName = getUpgradeSnapshotName(sysCatalogTableName,
+ currentServerSideTableTimeStamp);
+ createSnapshot(snapshotName, sysCatalogTableName);
+ }
+ String columnsToAdd = "";
+ // This will occur if we have an older SYSTEM.CATALOG and we need to update it to
+ // include any new columns we've added.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+ // We know that we always need to add the STORE_NULLS column for 4.3 release
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS
+ + " " + PBoolean.INSTANCE.getSqlTypeName());
+ try (HBaseAdmin admin = getAdmin()) {
+ HTableDescriptor[] localIndexTables = admin
+ .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*");
+ for (HTableDescriptor table : localIndexTables) {
+ if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
+ && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
+ table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
+ MetaDataUtil.getUserTableName(table.getNameAsString()));
+ // Explicitly disable, modify and enable the table to ensure
+ // co-location of data and index regions. If we just modify the
+ // table descriptor when online schema change enabled may reopen
+ // the region in same region server instead of following data region.
+ admin.disableTable(table.getTableName());
+ admin.modifyTable(table.getTableName(), table);
+ admin.enableTable(table.getTableName());
}
}
}
}
- private void restoreFromSnapshot(String tableName, String snapshotName,
- boolean success) throws SQLException {
- boolean snapshotRestored = false;
- boolean tableDisabled = false;
- if (!success && snapshotName != null) {
- SQLException sqlE = null;
- HBaseAdmin admin = null;
- try {
- logger.warn("Starting restore of " + tableName + " using snapshot "
- + snapshotName + " because upgrade failed");
- admin = getAdmin();
- admin.disableTable(tableName);
- tableDisabled = true;
- admin.restoreSnapshot(snapshotName);
- snapshotRestored = true;
- logger.warn("Successfully restored " + tableName + " using snapshot "
- + snapshotName);
- } catch (Exception e) {
- sqlE = new SQLException(e);
- } finally {
- if (admin != null && tableDisabled) {
- try {
- admin.enableTable(tableName);
- if (snapshotRestored) {
- logger.warn("Successfully restored and enabled " + tableName + " using snapshot "
- + snapshotName);
- } else {
- logger.warn("Successfully enabled " + tableName + " after restoring using snapshot "
- + snapshotName + " failed. ");
- }
- } catch (Exception e1) {
- SQLException enableTableEx = new SQLException(e1);
- if (sqlE == null) {
- sqlE = enableTableEx;
- } else {
- sqlE.setNextException(enableTableEx);
- }
- logger.error("Failure in enabling "
- + tableName
- + (snapshotRestored ? " after successfully restoring using snapshot"
- + snapshotName
- : " after restoring using snapshot "
- + snapshotName + " failed. "));
- } finally {
- try {
- admin.close();
- } catch (Exception e2) {
- SQLException adminCloseEx = new SQLException(e2);
- if (sqlE == null) {
- sqlE = adminCloseEx;
- } else {
- sqlE.setNextException(adminCloseEx);
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
- }
- }
- }
- }
- }
+ // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
+ // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
+ // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+ // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+ // the column names that have been added to SYSTEM.CATALOG since 4.0.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " "
+ + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " "
+ + PLong.INSTANCE.getSqlTypeName());
}
- private void ensureSystemTablesUpgraded(ReadOnlyProps props)
- throws SQLException, IOException, IllegalArgumentException, InterruptedException {
- if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
- HTableInterface metatable = null;
- try (HBaseAdmin admin = getAdmin()) {
- ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
- List<HTableDescriptor> tables = Arrays
- .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*", false));
- List<String> tableNames = getTableNames(tables);
- if (tableNames.size() == 0) { return; }
- if (tableNames.size() > 4) { throw new IllegalArgumentException(
- "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); }
- byte[] mappedSystemTable = SchemaUtil
- .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
- metatable = getTable(mappedSystemTable);
- if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) {
- if (!admin.tableExists(mappedSystemTable)) {
- UpgradeUtil.mapTableToNamespace(admin, metatable,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
- null);
- ConnectionQueryServicesImpl.this.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
- }
- tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
- }
- for (String table : tableNames) {
- UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM,
- null);
- ConnectionQueryServicesImpl.this.removeTable(null, table, null,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+ // If we have some new columns from 4.1-4.3 to add, add them now.
+ if (!columnsToAdd.isEmpty()) {
+ // Ugh..need to assign to another local variable to keep eclipse happy.
+ PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
+ metaConnection = newMetaConnection;
+ }
+
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+ + PInteger.INSTANCE.getSqlTypeName();
+ try {
+ metaConnection = addColumn(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd,
+ false);
+ upgradeTo4_5_0(metaConnection);
+ } catch (ColumnAlreadyExistsException ignored) {
+ /*
+ * Upgrade to 4.5 is a slightly special case. We use the fact that the
+ * column BASE_COLUMN_COUNT is already part of the meta-data schema as the
+ * signal that the server side upgrade has finished or is in progress.
+ */
+ logger.debug("No need to run 4.5 upgrade");
+ }
+ Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+ p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ p.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ PhoenixConnection conn = new PhoenixConnection(
+ ConnectionQueryServicesImpl.this, metaConnection.getURL(), p,
+ metaConnection.getMetaDataCache());
+ try {
+ List<String> tablesNeedingUpgrade = UpgradeUtil
+ .getPhysicalTablesWithDescRowKey(conn);
+ if (!tablesNeedingUpgrade.isEmpty()) {
+ logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n"
+ + Joiner.on(' ').join(tablesNeedingUpgrade)
+ + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
}
- if (!tableNames.isEmpty()) {
- clearCache();
+ List<String> unsupportedTables = UpgradeUtil
+ .getPhysicalTablesWithDescVarbinaryRowKey(conn);
+ if (!unsupportedTables.isEmpty()) {
+ logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n"
+ + Joiner.on(' ').join(unsupportedTables));
}
+ } catch (Exception ex) {
+ logger.error(
+ "Unable to determine tables requiring upgrade due to PHOENIX-2067",
+ ex);
} finally {
- if (metatable != null) {
- metatable.close();
- }
+ conn.close();
+ }
+ }
+ // Add these columns one at a time, each with different timestamps so that if folks
+ // have
+ // run the upgrade code already for a snapshot, we'll still enter this block (and do
+ // the
+ // parts we haven't yet done).
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " "
+ + PBoolean.INSTANCE.getSqlTypeName();
+ metaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
+ }
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+ // Drop old stats table so that new stats table is created
+ metaConnection = dropStatsTable(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
+ PhoenixDatabaseMetaData.TRANSACTIONAL + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
+ PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " "
+ + PLong.INSTANCE.getSqlTypeName());
+ metaConnection = setImmutableTableIndexesImmutable(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
+ metaConnection = updateSystemCatalogTimestamp(metaConnection,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+ clearCache();
+ }
+
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
+ PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
+ PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+ + PVarchar.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
+ PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
+ metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
+ if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
+ QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
+ metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
}
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
+ clearCache();
}
+ }
+
- /**
- * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
- * making use of HBase's checkAndPut api.
- * <p>
- * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
- * version cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
- * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which
- * wins the race will end up setting the version cell to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
- * for the release.
- * </p>
- *
- * @return true if client won the race, false otherwise
- * @throws IOException
- * @throws SQLException
- */
- private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
- SQLException {
- Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
- try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
- byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
- byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
- byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
- byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
- : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
- byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
- // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
- // to calculate SYSTEM.CATALOG's server side timestamp.
- Put put = new Put(row);
- put.add(family, qualifier, newValue);
- boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
- if (!acquired) { throw new UpgradeInProgressException(
- getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
- return true;
+ int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
+ QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+ QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ try {
+ String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
+ metaConnection.createStatement().executeUpdate(createSequenceTable);
+ nSequenceSaltBuckets = nSaltBuckets;
+ } catch (NewerTableAlreadyExistsException e) {
+ // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
+ // timestamp.
+ // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
+ // fixed timestamp.
+ nSequenceSaltBuckets = getSaltBuckets(e);
+ } catch (TableAlreadyExistsException e) {
+ // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to
+ // include
+ // any new columns we've added.
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+ // If the table time stamp is before 4.1.0 then we need to add below columns
+ // to the SYSTEM.SEQUENCE table.
+ String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " "
+ + PLong.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.MAX_VALUE + " "
+ + PLong.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.CYCLE_FLAG + " "
+ + PBoolean.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " "
+ + PBoolean.INSTANCE.getSqlTypeName();
+ addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+ }
+ // If the table timestamp is before 4.2.1 then run the upgrade script
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) {
+ if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
+ metaConnection.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES,
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
}
+ nSequenceSaltBuckets = nSaltBuckets;
+ } else {
+ nSequenceSaltBuckets = getSaltBuckets(e);
}
- });
+ }
+ try {
+ metaConnection.createStatement().executeUpdate(
+ QueryConstants.CREATE_STATS_TABLE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {} catch (TableAlreadyExistsException e) {
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ SYSTEM_STATS_NAME,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
+ + PLong.INSTANCE.getSqlTypeName());
+ }
+ }
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
+ } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+ if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
+ ConnectionQueryServicesImpl.this.getProps())) {
+ try {
+ metaConnection.createStatement().executeUpdate(
+ "CREATE SCHEMA IF NOT EXISTS "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+ } catch (NewerSchemaAlreadyExistsException e) {}
+ }
+ ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
+ success = true;
+ } catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
+ // don't set it as initializationException because otherwise client won't be able to retry
+ throw e;
} catch (Exception e) {
- Throwables.propagateIfInstanceOf(e, SQLException.class);
- Throwables.propagate(e);
+ if (e instanceof SQLException) {
+ toThrow = (SQLException)e;
+ } else {
+ // wrap every other exception into a SQLException
+ toThrow = new SQLException(e);
+ }
+ } finally {
+ try {
+ if (metaConnection != null) {
+ metaConnection.close();
+ }
+ } catch (SQLException e) {
+ if (toThrow != null) {
+ toThrow.setNextException(e);
+ } else {
+ toThrow = e;
+ }
+ } finally {
+ try {
+ restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+ } catch (SQLException e) {
+ if (toThrow != null) {
+ toThrow.setNextException(e);
+ } else {
+ toThrow = e;
+ }
+ }
+ if (toThrow != null) { throw toThrow; }
+ }
+ }
+ }
+
+ private void createSnapshot(String snapshotName, String tableName)
+ throws SQLException {
+ HBaseAdmin admin = null;
+ SQLException sqlE = null;
+ try {
+ admin = getAdmin();
+ admin.snapshot(snapshotName, tableName);
+ logger.info("Successfully created snapshot " + snapshotName + " for "
+ + tableName);
+ } catch (Exception e) {
+ sqlE = new SQLException(e);
+ } finally {
+ try {
+ if (admin != null) {
+ admin.close();
+ }
+ } catch (Exception e) {
+ SQLException adminCloseEx = new SQLException(e);
+ if (sqlE == null) {
+ sqlE = adminCloseEx;
+ } else {
+ sqlE.setNextException(adminCloseEx);
+ }
+ } finally {
+ if (sqlE != null) {
+ throw sqlE;
+ }
+ }
}
}
- private static class UpgradeInProgressException extends SQLException {
- public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
- super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
- + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
- .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
+ private void restoreFromSnapshot(String tableName, String snapshotName,
+ boolean success) throws SQLException {
+ boolean snapshotRestored = false;
+ boolean tableDisabled = false;
+ if (!success && snapshotName != null) {
+ SQLException sqlE = null;
+ HBaseAdmin admin = null;
+ try {
+ logger.warn("Starting restore of " + tableName + " using snapshot "
+ + snapshotName + " because upgrade failed");
+ admin = getAdmin();
+ admin.disableTable(tableName);
+ tableDisabled = true;
+ admin.restoreSnapshot(snapshotName);
+ snapshotRestored = true;
+ logger.warn("Successfully restored " + tableName + " using snapshot "
+ + snapshotName);
+ } catch (Exception e) {
+ sqlE = new SQLException(e);
+ } finally {
+ if (admin != null && tableDisabled) {
+ try {
+ admin.enableTable(tableName);
+ if (snapshotRestored) {
+ logger.warn("Successfully restored and enabled " + tableName + " using snapshot "
+ + snapshotName);
+ } else {
+ logger.warn("Successfully enabled " + tableName + " after restoring using snapshot "
+ + snapshotName + " failed. ");
+ }
+ } catch (Exception e1) {
+ SQLException enableTableEx = new SQLException(e1);
+ if (sqlE == null) {
+ sqlE = enableTableEx;
+ } else {
+ sqlE.setNextException(enableTableEx);
+ }
+ logger.error("Failure in enabling "
+ + tableName
+ + (snapshotRestored ? " after successfully restoring using snapshot"
+ + snapshotName
+ : " after restoring using snapshot "
+ + snapshotName + " failed. "));
+ } finally {
+ try {
+ admin.close();
+ } catch (Exception e2) {
+ SQLException adminCloseEx = new SQLException(e2);
+ if (sqlE == null) {
+ sqlE = adminCloseEx;
+ } else {
+ sqlE.setNextException(adminCloseEx);
+ }
+ } finally {
+ if (sqlE != null) {
+ throw sqlE;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void ensureSystemTablesUpgraded(ReadOnlyProps props)
+ throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+ if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
+ HTableInterface metatable = null;
+ try (HBaseAdmin admin = getAdmin()) {
+ ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
+ List<HTableDescriptor> tables = Arrays
+ .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*"));
+ List<String> tableNames = getTableNames(tables);
+ if (tableNames.size() == 0) { return; }
+ if (tableNames.size() > 4) { throw new IllegalArgumentException(
+ "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); }
+ byte[] mappedSystemTable = SchemaUtil
+ .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
+ metatable = getTable(mappedSystemTable);
+ if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) {
+ if (!admin.tableExists(mappedSystemTable)) {
+ UpgradeUtil.mapTableToNamespace(admin, metatable,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
+ null);
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+ }
+ tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ }
+ for (String table : tableNames) {
+ UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM,
+ null);
+ ConnectionQueryServicesImpl.this.removeTable(null, table, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+ }
+ if (!tableNames.isEmpty()) {
+ clearCache();
+ }
+ } finally {
+ if (metatable != null) {
+ metatable.close();
+ }
+ }
+ }
+
+ /**
+ * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
+ * making use of HBase's checkAndPut api.
+ * <p>
+ * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
+ * cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
+ * upgrading to a release newer than 4.8.1 the existing cell value will be non-null. The client which
+ * wins the race will end up setting the cell value to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
+ * for the release.
+ * </p>
+ *
+ * @return true if client won the race, false otherwise
+ * @throws IOException
+ * @throws SQLException
+ */
+ @VisibleForTesting
+ public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
+ SQLException {
+ Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
+ try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
+ byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+ byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+ byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
+ byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
+ : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
+ byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
+ // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
+ // to calculate SYSTEM.CATALOG's server side timestamp.
+ Put put = new Put(row);
+ put.addColumn(family, qualifier, newValue);
+ boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
+ if (!acquired) { throw new UpgradeInProgressException(
+ getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+ return true;
}
}
@@ -3879,4 +3967,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void invalidateStats(ImmutableBytesPtr tableName) {
this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
}
+
+ @Override
+ public boolean isUpgradeRequired() {
+ return upgradeRequired.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 560b5d9..337e43c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -650,4 +650,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public void invalidateStats(ImmutableBytesPtr tableName) {
this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
}
+
+ @Override
+ public void upgradeSystemTables(String url, Properties props) throws SQLException {}
+
+ @Override
+ public boolean isUpgradeRequired() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 99ad59c..81517e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -345,4 +345,14 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public void invalidateStats(ImmutableBytesPtr tableName) {
getDelegate().invalidateStats(tableName);
}
+
+ @Override
+ public void upgradeSystemTables(String url, Properties props) throws SQLException {
+ getDelegate().upgradeSystemTables(url, props);
+ }
+
+ @Override
+ public boolean isUpgradeRequired() {
+ return getDelegate().isUpgradeRequired();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 8cd009a..51a18d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -224,6 +224,7 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled";
public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
+ public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 669bcd2..9b87361 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE;
+import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
@@ -257,6 +258,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true;
public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
+ public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
@@ -334,7 +336,8 @@ public class QueryServicesOptions {
.setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE)
.setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED)
.setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
- .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE);
+ .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
+ .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
// it to 1, so we'll change it.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 28ed11d..73f1501 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -3406,7 +3406,7 @@ public class MetaDataClient {
String indexTenantId = entry.getKey();
Properties props = new Properties(connection.getClientInfo());
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, indexTenantId);
- try (PhoenixConnection tenantConn = DriverManager.getConnection(connection.getURL(), props).unwrap(PhoenixConnection.class)) {
+ try (PhoenixConnection tenantConn = new PhoenixConnection(connection, connection.getQueryServices(), props)) {
PostDDLCompiler dropCompiler = new PostDDLCompiler(tenantConn);
tenantConn.getQueryServices().updateData(dropCompiler.compile(entry.getValue(), null, null, Collections.<PColumn>emptyList(), ts));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index a8e80ab..fea6d61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -38,16 +38,23 @@ import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -93,7 +100,7 @@ public class PhoenixMetricsSink implements MetricsSink {
private Connection conn;
private String table;
-
+
public PhoenixMetricsSink() {
LOG.info("Writing tracing metrics to phoenix table");
@@ -133,14 +140,25 @@ public class PhoenixMetricsSink implements MetricsSink {
}
}
}
-
+
private void initializeInternal(Connection conn, String tableName) throws SQLException {
this.conn = conn;
-
// ensure that the target table already exists
- createTable(conn, tableName);
+ if (!traceTableExists(conn, tableName)) {
+ createTable(conn, tableName);
+ }
+ this.table = tableName;
}
-
+
+ private boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
+ try {
+ PhoenixRuntime.getTable(conn, traceTableName);
+ return true;
+ } catch (TableNotFoundException e) {
+ return false;
+ }
+ }
+
/**
* Used for <b>TESTING ONLY</b>
* Initialize the connection and setup the table to use the
@@ -183,10 +201,8 @@ public class PhoenixMetricsSink implements MetricsSink {
// tables created as transactional tables, make these table non
// transactional
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
-;
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
- this.table = table;
}
@Override
@@ -281,7 +297,12 @@ public class PhoenixMetricsSink implements MetricsSink {
for (String tag : variableValues) {
ps.setString(index++, tag);
}
- ps.execute();
+ // Not going through the standard route of using statement.execute() as that code path
+ // is blocked if the metadata hasn't been been upgraded to the new minor release.
+ MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
+ MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+ MutationState newState = plan.execute();
+ state.join(newState);
} catch (SQLException e) {
LOG.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt,
e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index a690dd8..764d135 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -172,7 +172,6 @@ public class PhoenixRuntime {
AUTO_COMMIT_ATTRIB,
CONSISTENCY_ATTRIB,
REQUEST_METRIC_ATTRIB,
- NO_UPGRADE_ATTRIB
};
/**
@@ -215,6 +214,7 @@ public class PhoenixRuntime {
props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false");
}
conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class);
+ conn.setRunningUpgrade(true);
if (execCmd.isMapNamespace()) {
String srcTable = execCmd.getSrcTable();
System.out.println("Starting upgrading table:" + srcTable + "... please don't kill it in between!!");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 6d8e00d..bab52a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -310,7 +310,7 @@ public final class QueryUtil {
}
/**
- * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade
+ * @return {@link PhoenixConnection} with {@value UpgradeUtil#RUN_UPGRADE} set so that we don't initiate server upgrade
*/
public static Connection getConnectionOnServer(Configuration conf) throws ClassNotFoundException,
SQLException {
@@ -318,12 +318,12 @@ public final class QueryUtil {
}
/**
- * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade
+ * @return {@link PhoenixConnection} with {@value UpgradeUtil#DO_NOT_UPGRADE} set so that we don't initiate metadata upgrade.
*/
public static Connection getConnectionOnServer(Properties props, Configuration conf)
throws ClassNotFoundException,
SQLException {
- props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString());
+ UpgradeUtil.doNotUpgradeOnFirstConnection(props);
return getConnection(props, conf);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 8bc3e63..cddebb7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -132,7 +132,12 @@ public class UpgradeUtil {
private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_"));
public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7");
-
+ /**
+ * Attribute for Phoenix's internal purposes only. When this attribute is set on a phoenix connection, then
+ * the upgrade code for upgrading the cluster to the new minor release is not triggered. Note that presence
+ * of this attribute overrides a true value for {@value QueryServices#AUTO_UPGRADE_ENABLED}.
+ */
+ private static final String DO_NOT_UPGRADE = "DoNotUpgrade";
public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT "
+ "INTO SYSTEM.CATALOG "
+ "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, BASE_COLUMN_COUNT) "
@@ -174,7 +179,7 @@ public class UpgradeUtil {
+ " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND "
+ LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + " AND ( " + TABLE_TYPE + "=" + "'"
+ PTableType.VIEW.getSerializedValue() + "' OR " + TABLE_TYPE + " IS NULL) ORDER BY "+TENANT_ID;
-
+
private UpgradeUtil() {
}
@@ -1498,7 +1503,7 @@ public class UpgradeUtil {
}
throw new SQLException(buf.toString());
}
- PhoenixConnection upgradeConn = new PhoenixConnection(conn, true);
+ PhoenixConnection upgradeConn = new PhoenixConnection(conn, true, true);
try {
upgradeConn.setAutoCommit(true);
for (PTable table : tablesNeedingUpgrading) {
@@ -1898,4 +1903,12 @@ public class UpgradeUtil {
String upgradingFrom = getVersion(currentSystemTableTimestamp);
return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
}
+
+ public static boolean isNoUpgradeSet(Properties props) {
+ return Boolean.compare(true, Boolean.valueOf(props.getProperty(DO_NOT_UPGRADE))) == 0;
+ }
+
+ public static void doNotUpgradeOnFirstConnection(Properties props) {
+ props.setProperty(DO_NOT_UPGRADE, String.valueOf(true));
+ }
}
\ No newline at end of file