You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/11/04 19:05:16 UTC
[phoenix] branch master updated: PHOENIX-6086 : Take snapshot of
all SYSTEM tables before attempting to upgrade them
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 6889645 PHOENIX-6086 : Take snapshot of all SYSTEM tables before attempting to upgrade them
6889645 is described below
commit 68896451c622598bd62dca8f42c2a467668d635a
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Nov 4 13:06:06 2020 +0530
PHOENIX-6086 : Take snapshot of all SYSTEM tables before attempting to upgrade them
Signed-off-by: Xinyi Yan <ya...@apache.org>
---
.../phoenix/query/ConnectionQueryServicesImpl.java | 175 ++++++++++++++++-----
.../java/org/apache/phoenix/util/UpgradeUtil.java | 7 +-
2 files changed, 137 insertions(+), 45 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 62c531b..7ccbac7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -73,7 +73,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
-import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
+import static org.apache.phoenix.util.UpgradeUtil.getSysTableSnapshotName;
import static org.apache.phoenix.util.UpgradeUtil.moveChildLinks;
import static org.apache.phoenix.util.UpgradeUtil.syncTableAndIndexProperties;
import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
@@ -310,6 +310,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
+ private static final String ALTER_TABLE_SET_PROPS =
+ "ALTER TABLE %s SET %s=%s";
private final GuidePostsCacheProvider
GUIDE_POSTS_CACHE_PROVIDER = new GuidePostsCacheProvider();
protected final Configuration config;
@@ -3769,11 +3771,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void upgradeSystemTables(final String url, final Properties props) throws SQLException {
PhoenixConnection metaConnection = null;
boolean success = false;
- String snapshotName = null;
+ final Map<String, String> systemTableToSnapshotMap = new HashMap<>();
String sysCatalogTableName = null;
SQLException toThrow = null;
boolean acquiredMutexLock = false;
- boolean snapshotCreated = false;
boolean moveChildLinks = false;
boolean syncAllTableAndIndexProps = false;
try {
@@ -3808,10 +3809,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
sysCatalogTableName = SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getNameAsString();
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, ConnectionQueryServicesImpl.this.getProps())) {
+ String snapshotName = null;
// Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table.
if (acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP)) {
LOGGER.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace "
+ "and/or upgrading " + sysCatalogTableName);
+ snapshotName = getSysTableSnapshotName(
+ currentServerSideTableTimeStamp, SYSTEM_CATALOG_NAME);
+ createSnapshot(snapshotName, SYSTEM_CATALOG_NAME);
+ systemTableToSnapshotMap.put(SYSTEM_CATALOG_NAME,
+ snapshotName);
+ LOGGER.info("Created snapshot {} for {}", snapshotName,
+ SYSTEM_CATALOG_NAME);
}
// We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
@@ -3819,6 +3828,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// If they don't exist or they're already migrated, this method will return immediately
ensureSystemTablesMigratedToSystemNamespace();
LOGGER.debug("Migrated SYSTEM tables to SYSTEM namespace");
+ if (snapshotName != null) {
+ deleteSnapshot(snapshotName);
+ } else {
+ snapshotName = getSysTableSnapshotName(
+ currentServerSideTableTimeStamp, SYSTEM_CATALOG_NAME);
+ }
+ systemTableToSnapshotMap.remove(SYSTEM_CATALOG_NAME);
+ // take snapshot of SYSTEM:CATALOG
+ createSnapshot(snapshotName, sysCatalogTableName);
+ systemTableToSnapshotMap.put(sysCatalogTableName,
+ snapshotName);
+ LOGGER.info("Created snapshot {} for {}", snapshotName,
+ sysCatalogTableName);
+
metaConnection = upgradeSystemCatalogIfRequired(metaConnection,
currentServerSideTableTimeStamp);
}
@@ -3831,10 +3854,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// SYSCAT table and hence it should not be interrupted
if (acquiredMutexLock = acquireUpgradeMutex(currentServerSideTableTimeStamp)) {
LOGGER.debug("Acquired lock in SYSMUTEX table for upgrading " + sysCatalogTableName);
- snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
- createSnapshot(snapshotName, sysCatalogTableName);
- snapshotCreated = true;
- LOGGER.debug("Created snapshot for SYSCAT");
+ takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
}
// We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
}
@@ -3848,7 +3868,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- metaConnection = upgradeOtherSystemTablesIfRequired(metaConnection, moveChildLinks);
+ // pass systemTableToSnapshotMap to capture more system table to
+ // snapshot entries
+ metaConnection = upgradeOtherSystemTablesIfRequired(metaConnection,
+ moveChildLinks, systemTableToSnapshotMap);
// Synchronize necessary properties amongst all column families of a base table
// and its indexes. See PHOENIX-3955
if (syncAllTableAndIndexProps) {
@@ -3885,8 +3908,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
} finally {
try {
- if (snapshotCreated) {
- restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+ for (Map.Entry<String, String> tableToSnapshotEntrySet
+ : systemTableToSnapshotMap.entrySet()) {
+ restoreFromSnapshot(tableToSnapshotEntrySet.getKey(),
+ tableToSnapshotEntrySet.getValue(), success);
}
} catch (SQLException e) {
if (toThrow != null) {
@@ -3913,28 +3938,41 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* @param metaConnection Phoenix connection
* @param moveChildLinks true if we need to move child links from SYSTEM.CATALOG to
* SYSTEM.CHILD_LINK
+ * @param systemTableToSnapshotMap table to snapshot map which can be
+ * where new entries of system table to it's corresponding created
+ * snapshot is added
* @return Phoenix connection
- * @throws SQLException
+ * @throws SQLException thrown by underlying upgrade system methods
+ * @throws IOException thrown by underlying upgrade system methods
*/
- private PhoenixConnection upgradeOtherSystemTablesIfRequired(PhoenixConnection metaConnection,
- boolean moveChildLinks)
- throws SQLException, IOException {
- metaConnection = upgradeSystemChildLink(metaConnection, moveChildLinks);
- metaConnection = upgradeSystemSequence(metaConnection);
- metaConnection = upgradeSystemStats(metaConnection);
- metaConnection = upgradeSystemTask(metaConnection);
+ private PhoenixConnection upgradeOtherSystemTablesIfRequired(
+ PhoenixConnection metaConnection, boolean moveChildLinks,
+ Map<String, String> systemTableToSnapshotMap)
+ throws SQLException, IOException {
+ // if we are really going to perform upgrades of other system tables,
+ // by this point we would have already taken mutex lock, hence
+ // we can proceed with creation of snapshots and add table to
+ // snapshot entries in systemTableToSnapshotMap
+ metaConnection = upgradeSystemChildLink(metaConnection, moveChildLinks,
+ systemTableToSnapshotMap);
+ metaConnection = upgradeSystemSequence(metaConnection,
+ systemTableToSnapshotMap);
+ metaConnection = upgradeSystemStats(metaConnection,
+ systemTableToSnapshotMap);
+ metaConnection = upgradeSystemTask(metaConnection,
+ systemTableToSnapshotMap);
metaConnection = upgradeSystemFunction(metaConnection);
metaConnection = upgradeSystemLog(metaConnection);
return upgradeSystemMutex(metaConnection);
}
- private PhoenixConnection upgradeSystemChildLink(PhoenixConnection metaConnection,
- boolean moveChildLinks)
- throws SQLException, IOException {
- try {
- metaConnection.createStatement().executeUpdate(getChildLinkDDL());
- } catch (TableAlreadyExistsException ignored) {
-
+ private PhoenixConnection upgradeSystemChildLink(
+ PhoenixConnection metaConnection, boolean moveChildLinks,
+ Map<String, String> systemTableToSnapshotMap) throws SQLException {
+ try (Statement statement = metaConnection.createStatement()) {
+ statement.executeUpdate(getChildLinkDDL());
+ } catch (TableAlreadyExistsException e) {
+ takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
}
if (moveChildLinks) {
moveChildLinks(metaConnection);
@@ -3942,14 +3980,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return metaConnection;
}
- private PhoenixConnection upgradeSystemSequence(PhoenixConnection metaConnection)
- throws SQLException {
+ private PhoenixConnection upgradeSystemSequence(
+ PhoenixConnection metaConnection,
+ Map<String, String> systemTableToSnapshotMap) throws SQLException {
int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
- try {
+ try (Statement statement = metaConnection.createStatement()) {
String createSequenceTable = getSystemSequenceTableDDL(nSaltBuckets);
- metaConnection.createStatement().executeUpdate(createSequenceTable);
+ statement.executeUpdate(createSequenceTable);
nSequenceSaltBuckets = nSaltBuckets;
} catch (NewerTableAlreadyExistsException e) {
// Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
@@ -3958,6 +3997,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// fixed timestamp.
nSequenceSaltBuckets = getSaltBuckets(e);
} catch (TableAlreadyExistsException e) {
+ // take snapshot first
+ takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
+
// This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to
// include
// any new columns we've added.
@@ -4000,14 +4042,36 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return metaConnection;
}
- private PhoenixConnection upgradeSystemStats(PhoenixConnection metaConnection)
- throws SQLException {
- try {
- metaConnection.createStatement().executeUpdate(
- QueryConstants.CREATE_STATS_TABLE_METADATA);
+ private void takeSnapshotOfSysTable(
+ Map<String, String> systemTableToSnapshotMap,
+ TableAlreadyExistsException e) throws SQLException {
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+ String tableName = e.getTable().getPhysicalName().getString();
+ String snapshotName = getSysTableSnapshotName(
+ currentServerSideTableTimeStamp, tableName);
+ // Snapshot qualifiers may only contain 'alphanumeric characters' and
+ // digits, hence : cannot be part of snapshot name
+ if (snapshotName.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
+ snapshotName = snapshotName.replace(
+ QueryConstants.NAMESPACE_SEPARATOR,
+ QueryConstants.NAME_SEPARATOR);
+ }
+ createSnapshot(snapshotName, tableName);
+ systemTableToSnapshotMap.put(tableName, snapshotName);
+ LOGGER.info("Snapshot {} created for table {}", snapshotName,
+ tableName);
+ }
+
+ private PhoenixConnection upgradeSystemStats(
+ PhoenixConnection metaConnection,
+ Map<String, String> systemTableToSnapshotMap) throws SQLException {
+ try (Statement statement = metaConnection.createStatement()) {
+ statement.executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignored) {
} catch (TableAlreadyExistsException e) {
+ // take snapshot first
+ takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp <
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
@@ -4036,13 +4100,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return metaConnection;
}
- private PhoenixConnection upgradeSystemTask(PhoenixConnection metaConnection)
+ private PhoenixConnection upgradeSystemTask(
+ PhoenixConnection metaConnection,
+ Map<String, String> systemTableToSnapshotMap)
throws SQLException, IOException {
- try {
- metaConnection.createStatement().executeUpdate(getTaskDDL());
+ try (Statement statement = metaConnection.createStatement()) {
+ statement.executeUpdate(getTaskDDL());
} catch (NewerTableAlreadyExistsException ignored) {
} catch (TableAlreadyExistsException e) {
+ // take snapshot first
+ takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp <=
MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
@@ -4060,8 +4128,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection =
addColumnsIfNotExists(metaConnection, taskTableFullName,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
- metaConnection.createStatement().executeUpdate(
- "ALTER TABLE " + taskTableFullName + " SET " + TTL + "=" + TASK_TABLE_TTL);
+ try (Statement statement = metaConnection.createStatement()) {
+ String setTtlQuery = String.format(ALTER_TABLE_SET_PROPS,
+ taskTableFullName, TTL, TASK_TABLE_TTL);
+ statement.executeUpdate(setTtlQuery);
+ }
clearCache();
}
// If SYSTEM.TASK does not have disabled regions split policy,
@@ -4092,7 +4163,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getFunctionTableDDL());
} catch (TableAlreadyExistsException ignored) {
-
+ // Since we are not performing any action as part of upgrading
+ // SYSTEM.FUNCTION, we don't need to take snapshot as of this
+ // writing. However, if need arises to perform significant
+ // update, we should take snapshot just like other system tables.
+ // e.g usages of takeSnapshotOfSysTable()
}
return metaConnection;
}
@@ -4102,7 +4177,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getLogTableDDL());
} catch (TableAlreadyExistsException ignored) {
-
+ // Since we are not performing any action as part of upgrading
+ // SYSTEM.LOG, we don't need to take snapshot as of this
+ // writing. However, if need arises to perform significant
+ // update, we should take snapshot just like other system tables.
+ // e.g usages of takeSnapshotOfSysTable()
}
return metaConnection;
}
@@ -4112,7 +4191,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getMutexDDL());
} catch (TableAlreadyExistsException ignored) {
-
+ // Since we are not performing any action as part of upgrading
+ // SYSTEM.MUTEX, we don't need to take snapshot as of this
+ // writing. However, if need arises to perform significant
+ // update, we should take snapshot just like other system tables.
+ // e.g usages of takeSnapshotOfSysTable()
}
return metaConnection;
}
@@ -4198,6 +4281,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return metaConnection;
}
+ private void deleteSnapshot(String snapshotName)
+ throws SQLException, IOException {
+ try (Admin admin = getAdmin()) {
+ admin.deleteSnapshot(snapshotName);
+ LOGGER.info("Snapshot {} is deleted", snapshotName);
+ }
+ }
+
private void createSnapshot(String snapshotName, String tableName)
throws SQLException {
Admin admin = null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index a33943a..4b52982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -2543,12 +2543,13 @@ public class UpgradeUtil {
}
}
- public static final String getSysCatalogSnapshotName(long currentSystemTableTimestamp) {
- String tableString = SYSTEM_CATALOG_NAME;
+ public static String getSysTableSnapshotName(
+ long currentSystemTableTimestamp, String tableName) {
Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");
String date = formatter.format(new Date(EnvironmentEdgeManager.currentTimeMillis()));
String upgradingFrom = getVersion(currentSystemTableTimestamp);
- return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
+ return String.format("SNAPSHOT_%s_%s_TO_%s_%s", tableName,
+ upgradingFrom, CURRENT_CLIENT_VERSION, date);
}
public static boolean isNoUpgradeSet(Properties props) {