You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/11/04 19:05:14 UTC

[phoenix] branch 4.x updated: PHOENIX-6086 : Take snapshot of all SYSTEM tables before attempting to upgrade them

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 20d2a6d  PHOENIX-6086 : Take snapshot of all SYSTEM tables before attempting to upgrade them
20d2a6d is described below

commit 20d2a6dd1c2df6f87a4beced2d811ee3a6149634
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Nov 4 13:11:36 2020 +0530

    PHOENIX-6086 : Take snapshot of all SYSTEM tables before attempting to upgrade them
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 .../phoenix/query/ConnectionQueryServicesImpl.java | 175 ++++++++++++++++-----
 .../java/org/apache/phoenix/util/UpgradeUtil.java  |   7 +-
 2 files changed, 137 insertions(+), 45 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c6dc006..a0512bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -72,7 +72,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
 import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
-import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
+import static org.apache.phoenix.util.UpgradeUtil.getSysTableSnapshotName;
 import static org.apache.phoenix.util.UpgradeUtil.moveChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.syncTableAndIndexProperties;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
@@ -306,6 +306,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
     private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
     private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
+    private static final String ALTER_TABLE_SET_PROPS =
+        "ALTER TABLE %s SET %s=%s";
     private final GuidePostsCacheProvider
             GUIDE_POSTS_CACHE_PROVIDER = new GuidePostsCacheProvider();
     protected final Configuration config;
@@ -3749,11 +3751,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public void upgradeSystemTables(final String url, final Properties props) throws SQLException {
         PhoenixConnection metaConnection = null;
         boolean success = false;
-        String snapshotName = null;
+        final Map<String, String> systemTableToSnapshotMap = new HashMap<>();
         String sysCatalogTableName = null;
         SQLException toThrow = null;
         boolean acquiredMutexLock = false;
-        boolean snapshotCreated = false;
         boolean moveChildLinks = false;
         boolean syncAllTableAndIndexProps = false;
         try {
@@ -3788,10 +3789,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
                 sysCatalogTableName = SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getNameAsString();
                 if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, ConnectionQueryServicesImpl.this.getProps())) {
+                    String snapshotName = null;
                     // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table.
                     if (acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP)) {
                         LOGGER.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace "
                           + "and/or upgrading " + sysCatalogTableName);
+                        snapshotName = getSysTableSnapshotName(
+                            currentServerSideTableTimeStamp, SYSTEM_CATALOG_NAME);
+                        createSnapshot(snapshotName, SYSTEM_CATALOG_NAME);
+                        systemTableToSnapshotMap.put(SYSTEM_CATALOG_NAME,
+                            snapshotName);
+                        LOGGER.info("Created snapshot {} for {}", snapshotName,
+                            SYSTEM_CATALOG_NAME);
                     }
                     // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
 
@@ -3799,6 +3808,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     // If they don't exist or they're already migrated, this method will return immediately
                     ensureSystemTablesMigratedToSystemNamespace();
                     LOGGER.debug("Migrated SYSTEM tables to SYSTEM namespace");
+                    if (snapshotName != null) {
+                        deleteSnapshot(snapshotName);
+                    } else {
+                        snapshotName = getSysTableSnapshotName(
+                            currentServerSideTableTimeStamp, SYSTEM_CATALOG_NAME);
+                    }
+                    systemTableToSnapshotMap.remove(SYSTEM_CATALOG_NAME);
+                    // take snapshot of SYSTEM:CATALOG
+                    createSnapshot(snapshotName, sysCatalogTableName);
+                    systemTableToSnapshotMap.put(sysCatalogTableName,
+                        snapshotName);
+                    LOGGER.info("Created snapshot {} for {}", snapshotName,
+                        sysCatalogTableName);
+
                     metaConnection = upgradeSystemCatalogIfRequired(metaConnection,
                             currentServerSideTableTimeStamp);
                 }
@@ -3811,10 +3834,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     // SYSCAT table and hence it should not be interrupted
                     if (acquiredMutexLock = acquireUpgradeMutex(currentServerSideTableTimeStamp)) {
                         LOGGER.debug("Acquired lock in SYSMUTEX table for upgrading " + sysCatalogTableName);
-                        snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
-                        createSnapshot(snapshotName, sysCatalogTableName);
-                        snapshotCreated = true;
-                        LOGGER.debug("Created snapshot for SYSCAT");
+                        takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
                     }
                     // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
                 }
@@ -3828,7 +3848,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
             }
 
-            metaConnection = upgradeOtherSystemTablesIfRequired(metaConnection, moveChildLinks);
+            // pass systemTableToSnapshotMap to capture more system table to
+            // snapshot entries
+            metaConnection = upgradeOtherSystemTablesIfRequired(metaConnection,
+                moveChildLinks, systemTableToSnapshotMap);
             // Synchronize necessary properties amongst all column families of a base table
             // and its indexes. See PHOENIX-3955
             if (syncAllTableAndIndexProps) {
@@ -3865,8 +3888,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
             } finally {
                 try {
-                    if (snapshotCreated) {
-                        restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+                    for (Map.Entry<String, String> tableToSnapshotEntrySet
+                            : systemTableToSnapshotMap.entrySet()) {
+                        restoreFromSnapshot(tableToSnapshotEntrySet.getKey(),
+                            tableToSnapshotEntrySet.getValue(), success);
                     }
                 } catch (SQLException e) {
                     if (toThrow != null) {
@@ -3893,28 +3918,41 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      * @param metaConnection Phoenix connection
      * @param moveChildLinks true if we need to move child links from SYSTEM.CATALOG to
      *                       SYSTEM.CHILD_LINK
+     * @param systemTableToSnapshotMap table to snapshot map which can be
+     *     where new entries of system table to it's corresponding  created
+     *     snapshot is added
      * @return Phoenix connection
-     * @throws SQLException
+     * @throws SQLException thrown by underlying upgrade system methods
+     * @throws IOException thrown by underlying upgrade system methods
      */
-    private PhoenixConnection upgradeOtherSystemTablesIfRequired(PhoenixConnection metaConnection,
-            boolean moveChildLinks)
-    throws SQLException, IOException {
-        metaConnection = upgradeSystemChildLink(metaConnection, moveChildLinks);
-        metaConnection = upgradeSystemSequence(metaConnection);
-        metaConnection = upgradeSystemStats(metaConnection);
-        metaConnection = upgradeSystemTask(metaConnection);
+    private PhoenixConnection upgradeOtherSystemTablesIfRequired(
+            PhoenixConnection metaConnection, boolean moveChildLinks,
+            Map<String, String> systemTableToSnapshotMap)
+            throws SQLException, IOException {
+        // if we are really going to perform upgrades of other system tables,
+        // by this point we would have already taken mutex lock, hence
+        // we can proceed with creation of snapshots and add table to
+        // snapshot entries in systemTableToSnapshotMap
+        metaConnection = upgradeSystemChildLink(metaConnection, moveChildLinks,
+            systemTableToSnapshotMap);
+        metaConnection = upgradeSystemSequence(metaConnection,
+            systemTableToSnapshotMap);
+        metaConnection = upgradeSystemStats(metaConnection,
+            systemTableToSnapshotMap);
+        metaConnection = upgradeSystemTask(metaConnection,
+            systemTableToSnapshotMap);
         metaConnection = upgradeSystemFunction(metaConnection);
         metaConnection = upgradeSystemLog(metaConnection);
         return upgradeSystemMutex(metaConnection);
     }
 
-    private PhoenixConnection upgradeSystemChildLink(PhoenixConnection metaConnection,
-            boolean moveChildLinks)
-    throws SQLException, IOException {
-        try {
-            metaConnection.createStatement().executeUpdate(getChildLinkDDL());
-        } catch (TableAlreadyExistsException ignored) {
-
+    private PhoenixConnection upgradeSystemChildLink(
+            PhoenixConnection metaConnection, boolean moveChildLinks,
+            Map<String, String> systemTableToSnapshotMap) throws SQLException {
+        try (Statement statement = metaConnection.createStatement()) {
+            statement.executeUpdate(getChildLinkDDL());
+        } catch (TableAlreadyExistsException e) {
+            takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
         }
         if (moveChildLinks) {
             moveChildLinks(metaConnection);
@@ -3922,14 +3960,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return metaConnection;
     }
 
-    private PhoenixConnection upgradeSystemSequence(PhoenixConnection metaConnection)
-    throws SQLException {
+    private PhoenixConnection upgradeSystemSequence(
+            PhoenixConnection metaConnection,
+            Map<String, String> systemTableToSnapshotMap) throws SQLException {
         int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
                 QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                 QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
-        try {
+        try (Statement statement = metaConnection.createStatement()) {
             String createSequenceTable = getSystemSequenceTableDDL(nSaltBuckets);
-            metaConnection.createStatement().executeUpdate(createSequenceTable);
+            statement.executeUpdate(createSequenceTable);
             nSequenceSaltBuckets = nSaltBuckets;
         } catch (NewerTableAlreadyExistsException e) {
             // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
@@ -3938,6 +3977,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             // fixed timestamp.
             nSequenceSaltBuckets = getSaltBuckets(e);
         } catch (TableAlreadyExistsException e) {
+            // take snapshot first
+            takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
+
             // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to
             // include
             // any new columns we've added.
@@ -3979,14 +4021,36 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return metaConnection;
     }
 
-    private PhoenixConnection upgradeSystemStats(PhoenixConnection metaConnection)
-    throws SQLException {
-        try {
-            metaConnection.createStatement().executeUpdate(
-                    QueryConstants.CREATE_STATS_TABLE_METADATA);
+    private void takeSnapshotOfSysTable(
+            Map<String, String> systemTableToSnapshotMap,
+            TableAlreadyExistsException e) throws SQLException {
+        long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+        String tableName = e.getTable().getPhysicalName().getString();
+        String snapshotName = getSysTableSnapshotName(
+            currentServerSideTableTimeStamp, tableName);
+        // Snapshot qualifiers may only contain 'alphanumeric characters' and
+        // digits, hence : cannot be part of snapshot name
+        if (snapshotName.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
+            snapshotName = snapshotName.replace(
+                QueryConstants.NAMESPACE_SEPARATOR,
+                QueryConstants.NAME_SEPARATOR);
+        }
+        createSnapshot(snapshotName, tableName);
+        systemTableToSnapshotMap.put(tableName, snapshotName);
+        LOGGER.info("Snapshot {} created for table {}", snapshotName,
+            tableName);
+    }
+
+    private PhoenixConnection upgradeSystemStats(
+            PhoenixConnection metaConnection,
+            Map<String, String> systemTableToSnapshotMap) throws SQLException {
+        try (Statement statement = metaConnection.createStatement()) {
+            statement.executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA);
         } catch (NewerTableAlreadyExistsException ignored) {
 
         } catch (TableAlreadyExistsException e) {
+            // take snapshot first
+            takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
             long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
             if (currentServerSideTableTimeStamp <
                     MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
@@ -4015,13 +4079,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return metaConnection;
     }
 
-    private PhoenixConnection upgradeSystemTask(PhoenixConnection metaConnection)
+    private PhoenixConnection upgradeSystemTask(
+            PhoenixConnection metaConnection,
+            Map<String, String> systemTableToSnapshotMap)
             throws SQLException, IOException {
-        try {
-            metaConnection.createStatement().executeUpdate(getTaskDDL());
+        try (Statement statement = metaConnection.createStatement()) {
+            statement.executeUpdate(getTaskDDL());
         } catch (NewerTableAlreadyExistsException ignored) {
 
         } catch (TableAlreadyExistsException e) {
+            // take snapshot first
+            takeSnapshotOfSysTable(systemTableToSnapshotMap, e);
             long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
             if (currentServerSideTableTimeStamp <=
                     MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
@@ -4039,8 +4107,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 metaConnection =
                         addColumnsIfNotExists(metaConnection, taskTableFullName,
                                 MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
-                metaConnection.createStatement().executeUpdate(
-                        "ALTER TABLE " + taskTableFullName + " SET " + TTL + "=" + TASK_TABLE_TTL);
+                try (Statement statement = metaConnection.createStatement()) {
+                    String setTtlQuery = String.format(ALTER_TABLE_SET_PROPS,
+                        taskTableFullName, TTL, TASK_TABLE_TTL);
+                    statement.executeUpdate(setTtlQuery);
+                }
                 clearCache();
             }
             // If SYSTEM.TASK does not have disabled regions split policy,
@@ -4068,7 +4139,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             metaConnection.createStatement().executeUpdate(getFunctionTableDDL());
         } catch (TableAlreadyExistsException ignored) {
-
+            // Since we are not performing any action as part of upgrading
+            // SYSTEM.FUNCTION, we don't need to take snapshot as of this
+            // writing. However, if need arises to perform significant
+            // update, we should take snapshot just like other system tables.
+            // e.g usages of takeSnapshotOfSysTable()
         }
         return metaConnection;
     }
@@ -4078,7 +4153,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             metaConnection.createStatement().executeUpdate(getLogTableDDL());
         } catch (TableAlreadyExistsException ignored) {
-
+            // Since we are not performing any action as part of upgrading
+            // SYSTEM.LOG, we don't need to take snapshot as of this
+            // writing. However, if need arises to perform significant
+            // update, we should take snapshot just like other system tables.
+            // e.g usages of takeSnapshotOfSysTable()
         }
         return metaConnection;
     }
@@ -4088,7 +4167,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             metaConnection.createStatement().executeUpdate(getMutexDDL());
         } catch (TableAlreadyExistsException ignored) {
-
+            // Since we are not performing any action as part of upgrading
+            // SYSTEM.MUTEX, we don't need to take snapshot as of this
+            // writing. However, if need arises to perform significant
+            // update, we should take snapshot just like other system tables.
+            // e.g usages of takeSnapshotOfSysTable()
         }
         return metaConnection;
     }
@@ -4174,6 +4257,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return metaConnection;
     }
 
+    private void deleteSnapshot(String snapshotName)
+            throws SQLException, IOException {
+        try (HBaseAdmin admin = getAdmin()) {
+            admin.deleteSnapshot(snapshotName);
+            LOGGER.info("Snapshot {} is deleted", snapshotName);
+        }
+    }
+
     private void createSnapshot(String snapshotName, String tableName)
             throws SQLException {
         HBaseAdmin admin = null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index c91fff9..207d472 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -2570,12 +2570,13 @@ public class UpgradeUtil {
         }
     }
 
-    public static final String getSysCatalogSnapshotName(long currentSystemTableTimestamp) {
-        String tableString = SYSTEM_CATALOG_NAME;
+    public static String getSysTableSnapshotName(
+            long currentSystemTableTimestamp, String tableName) {
         Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");
         String date = formatter.format(new Date(EnvironmentEdgeManager.currentTimeMillis()));
         String upgradingFrom = getVersion(currentSystemTableTimestamp);
-        return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
+        return String.format("SNAPSHOT_%s_%s_TO_%s_%s", tableName,
+            upgradingFrom, CURRENT_CLIENT_VERSION, date);
     }
     
     public static boolean isNoUpgradeSet(Properties props) {