You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/20 18:06:06 UTC

[43/47] phoenix git commit: PHOENIX-3174 Make minor upgrade a manual step

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1aa9b88..dfe7ee8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -63,6 +63,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -131,6 +132,8 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateR
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.exception.UpgradeInProgressException;
+import org.apache.phoenix.exception.UpgradeNotRequiredException;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.hbase.index.Indexer;
@@ -266,6 +269,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
     private ScheduledExecutorService renewLeaseExecutor;
     private final boolean renewLeaseEnabled;
+    private final boolean isAutoUpgradeEnabled;
+    private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -342,6 +347,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         connectionQueues = ImmutableList.copyOf(list);
         // A little bit of a smell to leak `this` here, but should not be a problem
         this.tableStatsCache = new TableStatsCache(this, config);
+        this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
     }
 
     @Override
@@ -2310,29 +2316,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             return null;
                         }
                         checkClosed();
-                        PhoenixConnection metaConnection = null;
-                        boolean success = false;
-                        String snapshotName = null;
-                        String sysCatalogTableName = null;
                         try {
                             openConnection();
-                            String noUpgradeProp = props.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
-                            boolean upgradeSystemTables = !Boolean.TRUE.equals(Boolean.valueOf(noUpgradeProp));
-                            Properties scnProps = PropertiesUtil.deepCopy(props);
-                            scnProps.setProperty(
-                                    PhoenixRuntime.CURRENT_SCN_ATTRIB,
-                                    Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
-                            scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-                            String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
-                            metaConnection = new PhoenixConnection(
-                                    ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData());
+                            boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);
                             try (HBaseAdmin admin = getAdmin()) {
                                 boolean mappedSystemCatalogExists = admin
                                         .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true));
                                 if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
                                         ConnectionQueryServicesImpl.this.getProps())) {
                                     if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) {
-                                        //check if the server is already updated and have namespace config properly set.
+                                        //check if the server is already updated and have namespace config properly set. 
                                         checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
                                     }
                                     ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps());
@@ -2345,232 +2338,32 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                                 + IS_NAMESPACE_MAPPING_ENABLED + " enabled")
                                                 .build().buildException(); }
                             }
-                            try {
-                                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
-                            } catch (NewerTableAlreadyExistsException ignore) {
-                                // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
-                                // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
-                            } catch (TableAlreadyExistsException e) {
-                                if (upgradeSystemTables) {
-                                    long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
-                                    sysCatalogTableName = e.getTable().getPhysicalName().getString();
-                                    if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable().getPhysicalName().getBytes())) {
-                                        snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
-                                        createSnapshot(snapshotName, sysCatalogTableName);
-                                    }
-                                    String columnsToAdd = "";
-                                    // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
-                                    // any new columns we've added.
-                                    if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
-                                        // We know that we always need to add the STORE_NULLS column for 4.3 release
-                                        columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
-                                        try (HBaseAdmin admin = getAdmin()) {
-                                            HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
-                                            for (HTableDescriptor table : localIndexTables) {
-                                                if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
-                                                        && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
-                                                    table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
-                                                            MetaDataUtil.getUserTableName(table
-                                                                    .getNameAsString()));
-                                                    // Explicitly disable, modify and enable the table to ensure co-location of data
-                                                    // and index regions. If we just modify the table descriptor when online schema
-                                                    // change enabled may reopen the region in same region server instead of following data region.
-                                                    admin.disableTable(table.getTableName());
-                                                    admin.modifyTable(table.getTableName(), table);
-                                                    admin.enableTable(table.getTableName());
-                                                }
-                                            }
-                                        }
-                                    }
-
-                                    // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
-                                    // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
-                                    // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
-                                    // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
-                                    // the column names that have been added to SYSTEM.CATALOG since 4.0.
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
-                                        columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
-                                                + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName());
-                                    }
-
-                                    // If we have some new columns from 4.1-4.3 to add, add them now.
-                                    if (!columnsToAdd.isEmpty()) {
-                                        // Ugh..need to assign to another local variable to keep eclipse happy.
-                                        PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
-                                        metaConnection = newMetaConnection;
-                                    }
-
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
-                                        columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
-                                                + PInteger.INSTANCE.getSqlTypeName();
-                                        try {
-                                            metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                    MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
-                                            upgradeTo4_5_0(metaConnection);
-                                        } catch (ColumnAlreadyExistsException ignored) {
-                                            /*
-                                             * Upgrade to 4.5 is a slightly special case. We use the fact that the column
-                                             * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
-                                             * the server side upgrade has finished or is in progress.
-                                             */
-                                            logger.debug("No need to run 4.5 upgrade");
-                                        }
-                                        Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
-                                        props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
-                                        props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-                                        PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
-                                        try {
-                                            List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
-                                            if (!tablesNeedingUpgrade.isEmpty()) {
-                                                logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
-                                            }
-                                            List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
-                                            if (!unsupportedTables.isEmpty()) {
-                                                logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables));
-                                            }
-                                        } catch (Exception ex) {
-                                            logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
-                                        } finally {
-                                            conn.close();
-                                        }
-                                    }
-                                    // Add these columns one at a time, each with different timestamps so that if folks have
-                                    // run the upgrade code already for a snapshot, we'll still enter this block (and do the
-                                    // parts we haven't yet done).
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
-                                        columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
-                                        metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
-                                    }
-                                    if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
-                                        // Drop old stats table so that new stats table is created
-                                        metaConnection = dropStatsTable(metaConnection,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
-                                        metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
-                                                PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName());
-                                        metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
-                                                PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName());
-                                        metaConnection = setImmutableTableIndexesImmutable(metaConnection,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
-                                        metaConnection = updateSystemCatalogTimestamp(metaConnection,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
-                                        ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
-                                        clearCache();
-                                    }
-
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
-                                        metaConnection = addColumnsIfNotExists(metaConnection,
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
-                                                PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
-                                                        + PBoolean.INSTANCE.getSqlTypeName());
-                                        metaConnection = addColumnsIfNotExists(metaConnection,
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
-                                                PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
-                                                        + PVarchar.INSTANCE.getSqlTypeName());
-                                        metaConnection = addColumnsIfNotExists(metaConnection,
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
-                                                PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
-                                                        + PBoolean.INSTANCE.getSqlTypeName());
-                                        metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
-                                        if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
-                                                QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
-                                            metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
-                                        }
-                                        ConnectionQueryServicesImpl.this.removeTable(null,
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
-                                        clearCache();
-                                    }
-
-                                }
-                            }
-
-                            int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
-                                    QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
-                            try {
-                                String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
-                                metaConnection.createStatement().executeUpdate(createSequenceTable);
-                                nSequenceSaltBuckets = nSaltBuckets;
-                            } catch (NewerTableAlreadyExistsException e) {
-                                // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
-                                // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
-                                nSequenceSaltBuckets = getSaltBuckets(e);
-                            } catch (TableAlreadyExistsException e) {
-                                if (upgradeSystemTables) {
-                                    // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include
-                                    // any new columns we've added.
+                            Properties scnProps = PropertiesUtil.deepCopy(props);
+                            scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                                    Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+                            scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                            String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
+                            try (PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
+                                    scnProps, newEmptyMetaData())) {
+                                try {
+                                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+                                } catch (NewerTableAlreadyExistsException ignore) {
+                                    // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
+                                    // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
+                                    // *after* this fixed timestamp.
+                                } catch (TableAlreadyExistsException e) {
                                     long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
-                                        // If the table time stamp is before 4.1.0 then we need to add below columns
-                                        // to the SYSTEM.SEQUENCE table.
-                                        String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
-                                                + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
-                                                + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName()
-                                                + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName();
-                                        addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
-                                    }
-                                    // If the table timestamp is before 4.2.1 then run the upgrade script
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) {
-                                        if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
-                                            metaConnection.removeTable(null,
-                                                   PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA,
-                                                   PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE,
-                                                   MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
-                                            clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
-                                                   PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES,
-                                                   PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES,
-                                                   MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
-                                            clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
-                                        }
-                                        nSequenceSaltBuckets = nSaltBuckets;
-                                    } else {
-                                        nSequenceSaltBuckets = getSaltBuckets(e);
+                                    if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
+                                        ConnectionQueryServicesImpl.this.upgradeRequired.set(true);
                                     }
                                 }
-                            }
-                            try {
-                                metaConnection.createStatement().executeUpdate(
-                                        QueryConstants.CREATE_STATS_TABLE_METADATA);
-                            } catch (NewerTableAlreadyExistsException ignore) {
-                            } catch(TableAlreadyExistsException e) {
-                                if (upgradeSystemTables) {
-                                    long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
-                                    if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
-                                        metaConnection = addColumnsIfNotExists(
-                                               metaConnection,
-                                               SYSTEM_STATS_NAME,
-                                               MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
-                                               PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
-                                                       + PLong.INSTANCE.getSqlTypeName());
-                                    }
+                                if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+                                    createOtherSystemTables(metaConnection);
+                                } else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) {
+                                    upgradeSystemTables(url, props);
                                 }
                             }
-                            try {
-                                metaConnection.createStatement().executeUpdate(
-                                        QueryConstants.CREATE_FUNCTION_METADATA);
-                            } catch (NewerTableAlreadyExistsException e) {
-                            } catch (TableAlreadyExistsException e) {
-                            }
-                            if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
-                                    ConnectionQueryServicesImpl.this.getProps())) {
-                                try {
-                                    metaConnection.createStatement().executeUpdate("CREATE SCHEMA IF NOT EXISTS "
-                                            + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
-                                } catch (NewerSchemaAlreadyExistsException e) {}
-                            }
-                            success = true;
                             scheduleRenewLeaseTasks();
-                        } catch (UpgradeInProgressException e) {
-                            // don't set it as initializationException because otherwise client won't be able to retry
-                            throw e;
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
                                 initializationException = (SQLException)e;
@@ -2579,222 +2372,517 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 initializationException = new SQLException(e);
                             }
                         } finally {
-                            try {
-                                if (metaConnection != null) metaConnection.close();
-                            } catch (SQLException e) {
-                                if (initializationException != null) {
-                                    initializationException.setNextException(e);
-                                } else {
-                                    initializationException = e;
-                                }
-                            } finally {
-                                try {
-                                    restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
-                                } catch (SQLException e) {
-                                    if (initializationException != null) {
-                                        initializationException.setNextException(e);
-                                    } else {
-                                        initializationException = e;
-                                    }
-                                }
-                                try {
-                                    if (initializationException != null) {
-                                        throw initializationException;
-                                    }
-                                } finally {
-                                    initialized = true;
-                                }
-                            }
+                            initialized = true;
                         }
-                    }
+                    } 
                     return null;
                 }
-
-                private void createSnapshot(String snapshotName, String tableName)
-                        throws SQLException {
-                    HBaseAdmin admin = null;
-                    SQLException sqlE = null;
-                    try {
-                        admin = getAdmin();
-                        admin.snapshot(snapshotName, tableName);
-                        logger.info("Successfully created snapshot " + snapshotName + " for "
-                                + tableName);
-                    } catch (Exception e) {
-                        sqlE = new SQLException(e);
-                    } finally {
-                        try {
-                            if (admin != null) {
-                                admin.close();
-                            }
-                        } catch (Exception e) {
-                            SQLException adminCloseEx = new SQLException(e);
-                            if (sqlE == null) {
-                                sqlE = adminCloseEx;
-                            } else {
-                                sqlE.setNextException(adminCloseEx);
-                            }
-                        } finally {
-                            if (sqlE != null) {
-                                throw sqlE;
+            });
+        } catch (Exception e) {
+            Throwables.propagateIfInstanceOf(e, SQLException.class);
+            Throwables.propagate(e);
+        }
+    }
+    
+    private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQLException {
+        try {
+            metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA);
+        } catch (TableAlreadyExistsException ignore) {}
+        try {
+            metaConnection.createStatement().execute(QueryConstants.CREATE_STATS_TABLE_METADATA);
+        } catch (TableAlreadyExistsException ignore) {}
+        try {
+            metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA);
+        } catch (TableAlreadyExistsException ignore) {}
+    }
+    
+    /**
+     * There is no other locking needed here since only one connection (on the same or different JVM) will be able to
+     * acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}.
+     */
+    @Override
+    public void upgradeSystemTables(final String url, final Properties props) throws SQLException {
+        PhoenixConnection metaConnection = null;
+        boolean success = false;
+        String snapshotName = null;
+        String sysCatalogTableName = null;
+        SQLException toThrow = null;
+        try {
+            if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+                throw new UpgradeNotRequiredException();
+            }
+            Properties scnProps = PropertiesUtil.deepCopy(props);
+            scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                    Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+            scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+            String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
+            metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
+                    scnProps, newEmptyMetaData());
+            metaConnection.setRunningUpgrade(true);
+            try {
+                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+            } catch (NewerTableAlreadyExistsException ignore) {
+                // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
+                // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
+                // *after* this fixed timestamp.
+            } catch (TableAlreadyExistsException e) {
+                long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+                sysCatalogTableName = e.getTable().getPhysicalName().getString();
+                if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP
+                        && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable()
+                                .getPhysicalName().getBytes())) {
+                    snapshotName = getUpgradeSnapshotName(sysCatalogTableName,
+                            currentServerSideTableTimeStamp);
+                    createSnapshot(snapshotName, sysCatalogTableName);
+                }
+                String columnsToAdd = "";
+                // This will occur if we have an older SYSTEM.CATALOG and we need to update it to
+                // include any new columns we've added.
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+                    // We know that we always need to add the STORE_NULLS column for 4.3 release
+                    columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS
+                            + " " + PBoolean.INSTANCE.getSqlTypeName());
+                    try (HBaseAdmin admin = getAdmin()) {
+                        HTableDescriptor[] localIndexTables = admin
+                                .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*");
+                        for (HTableDescriptor table : localIndexTables) {
+                            if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
+                                    && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
+                                table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
+                                        MetaDataUtil.getUserTableName(table.getNameAsString()));
+                                // Explicitly disable, modify and enable the table to ensure
+                                // co-location of data and index regions. If we just modify the
+                                // table descriptor when online schema change enabled may reopen 
+                                // the region in same region server instead of following data region.
+                                admin.disableTable(table.getTableName());
+                                admin.modifyTable(table.getTableName(), table);
+                                admin.enableTable(table.getTableName());
                             }
                         }
                     }
                 }
 
-                private void restoreFromSnapshot(String tableName, String snapshotName,
-                        boolean success) throws SQLException {
-                    boolean snapshotRestored = false;
-                    boolean tableDisabled = false;
-                    if (!success && snapshotName != null) {
-                        SQLException sqlE = null;
-                        HBaseAdmin admin = null;
-                        try {
-                            logger.warn("Starting restore of " + tableName + " using snapshot "
-                                    + snapshotName + " because upgrade failed");
-                            admin = getAdmin();
-                            admin.disableTable(tableName);
-                            tableDisabled = true;
-                            admin.restoreSnapshot(snapshotName);
-                            snapshotRestored = true;
-                            logger.warn("Successfully restored " + tableName + " using snapshot "
-                                    + snapshotName);
-                        } catch (Exception e) {
-                            sqlE = new SQLException(e);
-                        } finally {
-                            if (admin != null && tableDisabled) {
-                                try {
-                                    admin.enableTable(tableName);
-                                    if (snapshotRestored) {
-                                        logger.warn("Successfully restored and enabled " + tableName + " using snapshot "
-                                                + snapshotName);
-                                    } else {
-                                        logger.warn("Successfully enabled " + tableName + " after restoring using snapshot "
-                                                + snapshotName + " failed. ");
-                                    }
-                                } catch (Exception e1) {
-                                    SQLException enableTableEx = new SQLException(e1);
-                                    if (sqlE == null) {
-                                        sqlE = enableTableEx;
-                                    } else {
-                                        sqlE.setNextException(enableTableEx);
-                                    }
-                                    logger.error("Failure in enabling "
-                                            + tableName
-                                            + (snapshotRestored ? " after successfully restoring using snapshot"
-                                                    + snapshotName
-                                                    : " after restoring using snapshot "
-                                                    + snapshotName + " failed. "));
-                                } finally {
-                                    try {
-                                        admin.close();
-                                    } catch (Exception e2) {
-                                        SQLException adminCloseEx = new SQLException(e2);
-                                        if (sqlE == null) {
-                                            sqlE = adminCloseEx;
-                                        } else {
-                                            sqlE.setNextException(adminCloseEx);
-                                        }
-                                    } finally {
-                                        if (sqlE != null) {
-                                            throw sqlE;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
+                // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
+                // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
+                // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+                // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+                // the column names that have been added to SYSTEM.CATALOG since 4.0.
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+                    columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " "
+                            + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
+                            + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " "
+                            + PLong.INSTANCE.getSqlTypeName());
                 }
 
-                private void ensureSystemTablesUpgraded(ReadOnlyProps props)
-                        throws SQLException, IOException, IllegalArgumentException, InterruptedException {
-                    if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
-                    HTableInterface metatable = null;
-                    try (HBaseAdmin admin = getAdmin()) {
-                        ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
-                        List<HTableDescriptor> tables = Arrays
-                                .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*", false));
-                        List<String> tableNames = getTableNames(tables);
-                        if (tableNames.size() == 0) { return; }
-                        if (tableNames.size() > 4) { throw new IllegalArgumentException(
-                                "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); }
-                        byte[] mappedSystemTable = SchemaUtil
-                                .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
-                        metatable = getTable(mappedSystemTable);
-                        if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) {
-                            if (!admin.tableExists(mappedSystemTable)) {
-                                UpgradeUtil.mapTableToNamespace(admin, metatable,
-                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
-                                        null);
-                                ConnectionQueryServicesImpl.this.removeTable(null,
-                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
-                                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
-                            }
-                            tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
-                        }
-                        for (String table : tableNames) {
-                            UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM,
-                                    null);
-                            ConnectionQueryServicesImpl.this.removeTable(null, table, null,
-                                    MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+                // If we have some new columns from 4.1-4.3 to add, add them now.
+                if (!columnsToAdd.isEmpty()) {
+                    // Ugh..need to assign to another local variable to keep eclipse happy.
+                    PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
+                    metaConnection = newMetaConnection;
+                }
+
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+                    columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+                            + PInteger.INSTANCE.getSqlTypeName();
+                    try {
+                        metaConnection = addColumn(metaConnection,
+                                PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd,
+                                false);
+                        upgradeTo4_5_0(metaConnection);
+                    } catch (ColumnAlreadyExistsException ignored) {
+                        /*
+                         * Upgrade to 4.5 is a slightly special case. We use the fact that the
+                         * column BASE_COLUMN_COUNT is already part of the meta-data schema as the
+                         * signal that the server side upgrade has finished or is in progress.
+                         */
+                        logger.debug("No need to run 4.5 upgrade");
+                    }
+                    Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+                    p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+                    p.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                    PhoenixConnection conn = new PhoenixConnection(
+                            ConnectionQueryServicesImpl.this, metaConnection.getURL(), p,
+                            metaConnection.getMetaDataCache());
+                    try {
+                        List<String> tablesNeedingUpgrade = UpgradeUtil
+                                .getPhysicalTablesWithDescRowKey(conn);
+                        if (!tablesNeedingUpgrade.isEmpty()) {
+                            logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n"
+                                    + Joiner.on(' ').join(tablesNeedingUpgrade)
+                                    + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
                         }
-                        if (!tableNames.isEmpty()) {
-                            clearCache();
+                        List<String> unsupportedTables = UpgradeUtil
+                                .getPhysicalTablesWithDescVarbinaryRowKey(conn);
+                        if (!unsupportedTables.isEmpty()) {
+                            logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n"
+                                    + Joiner.on(' ').join(unsupportedTables));
                         }
+                    } catch (Exception ex) {
+                        logger.error(
+                                "Unable to determine tables requiring upgrade due to PHOENIX-2067",
+                                ex);
                     } finally {
-                        if (metatable != null) {
-                            metatable.close();
-                        }
+                        conn.close();
+                    }
+                }
+                // Add these columns one at a time, each with different timestamps so that if folks
+                // have
+                // run the upgrade code already for a snapshot, we'll still enter this block (and do
+                // the
+                // parts we haven't yet done).
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+                    columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " "
+                            + PBoolean.INSTANCE.getSqlTypeName();
+                    metaConnection = addColumnsIfNotExists(metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
+                }
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+                    // Drop old stats table so that new stats table is created
+                    metaConnection = dropStatsTable(metaConnection,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
+                            PhoenixDatabaseMetaData.TRANSACTIONAL + " "
+                                    + PBoolean.INSTANCE.getSqlTypeName());
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
+                            PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " "
+                                    + PLong.INSTANCE.getSqlTypeName());
+                    metaConnection = setImmutableTableIndexesImmutable(metaConnection,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
+                    metaConnection = updateSystemCatalogTimestamp(metaConnection,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+                    ConnectionQueryServicesImpl.this.removeTable(null,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+                    clearCache();
+                }
+
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
+                            PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+                                    + PBoolean.INSTANCE.getSqlTypeName());
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
+                            PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+                                    + PVarchar.INSTANCE.getSqlTypeName());
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
+                            PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+                                    + PBoolean.INSTANCE.getSqlTypeName());
+                    metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
+                    if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
+                            QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
+                        metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
                     }
+                    ConnectionQueryServicesImpl.this.removeTable(null,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
+                    clearCache();
                 }
+            }
+
 
-                /**
-                 * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
-                 * making use of HBase's checkAndPut api.
-                 * <p>
-                 * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
-                 * version cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
-                 * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which
-                 * wins the race will end up setting the version cell to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
-                 * for the release.
-                 * </p>
-                 *
-                 * @return true if client won the race, false otherwise
-                 * @throws IOException
-                 * @throws SQLException
-                 */
-                private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
-                SQLException {
-                    Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
-                    try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
-                        byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
-                        byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-                        byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
-                        byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
-                                : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
-                        byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
-                        // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
-                        // to calculate SYSTEM.CATALOG's server side timestamp.
-                        Put put = new Put(row);
-                        put.add(family, qualifier, newValue);
-                        boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
-                        if (!acquired) { throw new UpgradeInProgressException(
-                                getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
-                        return true;
+            int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
+                    QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+            try {
+                String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
+                metaConnection.createStatement().executeUpdate(createSequenceTable);
+                nSequenceSaltBuckets = nSaltBuckets;
+            } catch (NewerTableAlreadyExistsException e) {
+                // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
+                // timestamp.
+                // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
+                // fixed timestamp.
+                nSequenceSaltBuckets = getSaltBuckets(e);
+            } catch (TableAlreadyExistsException e) {
+                // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to
+                // include
+                // any new columns we've added.
+                long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+                    // If the table time stamp is before 4.1.0 then we need to add below columns
+                    // to the SYSTEM.SEQUENCE table.
+                    String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " "
+                            + PLong.INSTANCE.getSqlTypeName() + ", "
+                            + PhoenixDatabaseMetaData.MAX_VALUE + " "
+                            + PLong.INSTANCE.getSqlTypeName() + ", "
+                            + PhoenixDatabaseMetaData.CYCLE_FLAG + " "
+                            + PBoolean.INSTANCE.getSqlTypeName() + ", "
+                            + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " "
+                            + PBoolean.INSTANCE.getSqlTypeName();
+                    addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+                }
+                // If the table timestamp is before 4.2.1 then run the upgrade script
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) {
+                    if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
+                        metaConnection.removeTable(null,
+                                PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA,
+                                PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE,
+                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                        clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+                                PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES,
+                                PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES,
+                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                        clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
                     }
+                    nSequenceSaltBuckets = nSaltBuckets;
+                } else {
+                    nSequenceSaltBuckets = getSaltBuckets(e);
                 }
-            });
+            }
+            try {
+                metaConnection.createStatement().executeUpdate(
+                        QueryConstants.CREATE_STATS_TABLE_METADATA);
+            } catch (NewerTableAlreadyExistsException ignore) {} catch (TableAlreadyExistsException e) {
+                long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            SYSTEM_STATS_NAME,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                            PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
+                                    + PLong.INSTANCE.getSqlTypeName());
+                }
+            }
+            try {
+                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
+            } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+            if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
+                    ConnectionQueryServicesImpl.this.getProps())) {
+                try {
+                    metaConnection.createStatement().executeUpdate(
+                            "CREATE SCHEMA IF NOT EXISTS "
+                                    + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+                } catch (NewerSchemaAlreadyExistsException e) {}
+            }
+            ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
+            success = true;
+        } catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
+            // don't set it as initializationException because otherwise client won't be able to retry
+            throw e;
         } catch (Exception e) {
-            Throwables.propagateIfInstanceOf(e, SQLException.class);
-            Throwables.propagate(e);
+            if (e instanceof SQLException) {
+                toThrow = (SQLException)e;
+            } else {
+                // wrap every other exception into a SQLException
+                toThrow = new SQLException(e);
+            }
+        } finally {
+            try {
+                if (metaConnection != null) {
+                    metaConnection.close();
+                }
+            } catch (SQLException e) {
+                if (toThrow != null) {
+                    toThrow.setNextException(e);
+                } else {
+                    toThrow = e;
+                }
+            } finally {
+                try {
+                    restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+                } catch (SQLException e) {
+                    if (toThrow != null) {
+                        toThrow.setNextException(e);
+                    } else {
+                        toThrow = e;
+                    }
+                }
+                if (toThrow != null) { throw toThrow; }
+            }
+        }
+    }
+
+    private void createSnapshot(String snapshotName, String tableName)
+            throws SQLException {
+        HBaseAdmin admin = null;
+        SQLException sqlE = null;
+        try {
+            admin = getAdmin();
+            admin.snapshot(snapshotName, tableName);
+            logger.info("Successfully created snapshot " + snapshotName + " for "
+                    + tableName);
+        } catch (Exception e) {
+            sqlE = new SQLException(e);
+        } finally {
+            try {
+                if (admin != null) {
+                    admin.close();
+                }
+            } catch (Exception e) {
+                SQLException adminCloseEx = new SQLException(e);
+                if (sqlE == null) {
+                    sqlE = adminCloseEx;
+                } else {
+                    sqlE.setNextException(adminCloseEx);
+                }
+            } finally {
+                if (sqlE != null) {
+                    throw sqlE;
+                }
+            }
         }
     }
 
-    private static class UpgradeInProgressException extends SQLException {
-        public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
-            super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
-                    + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
-                    .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
+    private void restoreFromSnapshot(String tableName, String snapshotName,
+            boolean success) throws SQLException {
+        boolean snapshotRestored = false;
+        boolean tableDisabled = false;
+        if (!success && snapshotName != null) {
+            SQLException sqlE = null;
+            HBaseAdmin admin = null;
+            try {
+                logger.warn("Starting restore of " + tableName + " using snapshot "
+                        + snapshotName + " because upgrade failed");
+                admin = getAdmin();
+                admin.disableTable(tableName);
+                tableDisabled = true;
+                admin.restoreSnapshot(snapshotName);
+                snapshotRestored = true;
+                logger.warn("Successfully restored " + tableName + " using snapshot "
+                        + snapshotName);
+            } catch (Exception e) {
+                sqlE = new SQLException(e);
+            } finally {
+                if (admin != null && tableDisabled) {
+                    try {
+                        admin.enableTable(tableName);
+                        if (snapshotRestored) {
+                            logger.warn("Successfully restored and enabled " + tableName + " using snapshot "
+                                    + snapshotName);
+                        } else {
+                            logger.warn("Successfully enabled " + tableName + " after restoring using snapshot "
+                                    + snapshotName + " failed. ");
+                        }
+                    } catch (Exception e1) {
+                        SQLException enableTableEx = new SQLException(e1);
+                        if (sqlE == null) {
+                            sqlE = enableTableEx;
+                        } else {
+                            sqlE.setNextException(enableTableEx);
+                        }
+                        logger.error("Failure in enabling "
+                                + tableName
+                                + (snapshotRestored ? " after successfully restoring using snapshot"
+                                        + snapshotName
+                                        : " after restoring using snapshot "
+                                                + snapshotName + " failed. "));
+                    } finally {
+                        try {
+                            admin.close();
+                        } catch (Exception e2) {
+                            SQLException adminCloseEx = new SQLException(e2);
+                            if (sqlE == null) {
+                                sqlE = adminCloseEx;
+                            } else {
+                                sqlE.setNextException(adminCloseEx);
+                            }
+                        } finally {
+                            if (sqlE != null) {
+                                throw sqlE;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+    
+    private void ensureSystemTablesUpgraded(ReadOnlyProps props)
+            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+        if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
+        HTableInterface metatable = null;
+        try (HBaseAdmin admin = getAdmin()) {
+            ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
+            List<HTableDescriptor> tables = Arrays
+                    .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*"));
+            List<String> tableNames = getTableNames(tables);
+            if (tableNames.size() == 0) { return; }
+            if (tableNames.size() > 4) { throw new IllegalArgumentException(
+                    "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); }
+            byte[] mappedSystemTable = SchemaUtil
+                    .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
+            metatable = getTable(mappedSystemTable);
+            if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) {
+                if (!admin.tableExists(mappedSystemTable)) {
+                    UpgradeUtil.mapTableToNamespace(admin, metatable,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
+                            null);
+                    ConnectionQueryServicesImpl.this.removeTable(null,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+                }
+                tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            }
+            for (String table : tableNames) {
+                UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM,
+                        null);
+                ConnectionQueryServicesImpl.this.removeTable(null, table, null,
+                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
+            }
+            if (!tableNames.isEmpty()) {
+                clearCache();
+            }
+        } finally {
+            if (metatable != null) {
+                metatable.close();
+            }
+        }
+    }
+    
+    /**
+     * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
+     * making use of HBase's checkAndPut api.
+     * <p>
+     * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
+     * cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
+     * upgrading to a release newer than 4.8.1 the existing cell value will be non-null. The client which
+     * wins the race will end up setting the cell value to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
+     * for the release.
+     * </p>
+     * 
+     * @return true if client won the race, false otherwise
+     * @throws IOException
+     * @throws SQLException
+     */
+    @VisibleForTesting
+    public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
+            SQLException {
+        Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
+        try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
+            byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+            byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+            byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
+            byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
+                    : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
+            byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
+            // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
+            // to calculate SYSTEM.CATALOG's server side timestamp.
+            Put put = new Put(row);
+            put.addColumn(family, qualifier, newValue);
+            boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
+            if (!acquired) { throw new UpgradeInProgressException(
+                    getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+            return true;
         }
     }
 
@@ -3879,4 +3967,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public void invalidateStats(ImmutableBytesPtr tableName) {
         this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
     }
+
+    @Override
+    public boolean isUpgradeRequired() {
+        return upgradeRequired.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 560b5d9..337e43c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -650,4 +650,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public void invalidateStats(ImmutableBytesPtr tableName) {
         this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
     }
+
+    @Override
+    public void upgradeSystemTables(String url, Properties props) throws SQLException {}
+
+    @Override
+    public boolean isUpgradeRequired() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 99ad59c..81517e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -345,4 +345,14 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public void invalidateStats(ImmutableBytesPtr tableName) {
         getDelegate().invalidateStats(tableName);
     }
+
+    @Override
+    public void upgradeSystemTables(String url, Properties props) throws SQLException {
+        getDelegate().upgradeSystemTables(url, props);
+    }
+
+    @Override
+    public boolean isUpgradeRequired() {
+        return getDelegate().isUpgradeRequired();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 8cd009a..51a18d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -224,6 +224,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled";
     
     public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
+    public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 669bcd2..9b87361 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
 import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
 import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE;
+import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
@@ -257,6 +258,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true;
     
     public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
+    public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
 
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
@@ -334,7 +336,8 @@ public class QueryServicesOptions {
             .setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE)
             .setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED)
             .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
-            .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE);
+            .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
+            .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
         // it to 1, so we'll change it.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 28ed11d..73f1501 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -3406,7 +3406,7 @@ public class MetaDataClient {
                                 String indexTenantId = entry.getKey();
                                 Properties props = new Properties(connection.getClientInfo());
                                 props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, indexTenantId);
-                                try (PhoenixConnection tenantConn = DriverManager.getConnection(connection.getURL(), props).unwrap(PhoenixConnection.class)) {
+                                try (PhoenixConnection tenantConn = new PhoenixConnection(connection, connection.getQueryServices(), props)) {
                                     PostDDLCompiler dropCompiler = new PostDDLCompiler(tenantConn);
                                     tenantConn.getQueryServices().updateData(dropCompiler.compile(entry.getValue(), null, null, Collections.<PColumn>emptyList(), ts));
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index a8e80ab..fea6d61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -38,16 +38,23 @@ import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -93,7 +100,7 @@ public class PhoenixMetricsSink implements MetricsSink {
     private Connection conn;
 
     private String table;
-
+    
     public PhoenixMetricsSink() {
         LOG.info("Writing tracing metrics to phoenix table");
 
@@ -133,14 +140,25 @@ public class PhoenixMetricsSink implements MetricsSink {
             }
         }
     }
-
+    
     private void initializeInternal(Connection conn, String tableName) throws SQLException {
         this.conn = conn;
-
         // ensure that the target table already exists
-        createTable(conn, tableName);
+        if (!traceTableExists(conn, tableName)) {
+            createTable(conn, tableName);
+        }
+        this.table = tableName;
     }
-
+    
+    private boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
+        try {
+            PhoenixRuntime.getTable(conn, traceTableName);
+            return true;
+        } catch (TableNotFoundException e) {
+            return false;
+        }
+    }
+    
     /**
      * Used for <b>TESTING ONLY</b>
      * Initialize the connection and setup the table to use the
@@ -183,10 +201,8 @@ public class PhoenixMetricsSink implements MetricsSink {
                         // tables created as transactional tables, make these table non
                         // transactional
                         PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
-;
         PreparedStatement stmt = conn.prepareStatement(ddl);
         stmt.execute();
-        this.table = table;
     }
 
     @Override
@@ -281,7 +297,12 @@ public class PhoenixMetricsSink implements MetricsSink {
             for (String tag : variableValues) {
                 ps.setString(index++, tag);
             }
-            ps.execute();
+            // Not going through the standard route of using statement.execute() as that code path
+            // is blocked if the metadata hasn't been been upgraded to the new minor release. 
+            MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
+            MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+            MutationState newState = plan.execute();
+            state.join(newState);
         } catch (SQLException e) {
             LOG.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt,
                     e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index a690dd8..764d135 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -172,7 +172,6 @@ public class PhoenixRuntime {
             AUTO_COMMIT_ATTRIB,
             CONSISTENCY_ATTRIB,
             REQUEST_METRIC_ATTRIB,
-            NO_UPGRADE_ATTRIB
             };
 
     /**
@@ -215,6 +214,7 @@ public class PhoenixRuntime {
                 props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false");
             }
             conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class);
+            conn.setRunningUpgrade(true);
             if (execCmd.isMapNamespace()) {
                 String srcTable = execCmd.getSrcTable();
                 System.out.println("Starting upgrading table:" + srcTable + "... please don't kill it in between!!");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 6d8e00d..bab52a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -310,7 +310,7 @@ public final class QueryUtil {
     }
 
     /**
-     * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade
+     * @return {@link PhoenixConnection} with {@value UpgradeUtil#RUN_UPGRADE} set so that we don't initiate server upgrade
      */
     public static Connection getConnectionOnServer(Configuration conf) throws ClassNotFoundException,
             SQLException {
@@ -318,12 +318,12 @@ public final class QueryUtil {
     }
 
     /**
-     * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade
+     * @return {@link PhoenixConnection} with {@value UpgradeUtil#DO_NOT_UPGRADE} set so that we don't initiate metadata upgrade.
      */
     public static Connection getConnectionOnServer(Properties props, Configuration conf)
             throws ClassNotFoundException,
             SQLException {
-        props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString());
+        UpgradeUtil.doNotUpgradeOnFirstConnection(props);
         return getConnection(props, conf);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 8bc3e63..cddebb7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -132,7 +132,12 @@ public class UpgradeUtil {
     private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
     private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_"));
     public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7");
-    
+    /**
+     * Attribute for Phoenix's internal purposes only. When this attribute is set on a phoenix connection, then
+     * the upgrade code for upgrading the cluster to the new minor release is not triggered. Note that presence 
+     * of this attribute overrides a true value for {@value QueryServices#AUTO_UPGRADE_ENABLED}.     
+     */
+    private static final String DO_NOT_UPGRADE = "DoNotUpgrade";
     public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT "
             + "INTO SYSTEM.CATALOG "
             + "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, BASE_COLUMN_COUNT) "
@@ -174,7 +179,7 @@ public class UpgradeUtil {
             + " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND "
             + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + " AND ( " + TABLE_TYPE + "=" + "'"
             + PTableType.VIEW.getSerializedValue() + "' OR " + TABLE_TYPE + " IS NULL) ORDER BY "+TENANT_ID;
-
+    
     private UpgradeUtil() {
     }
 
@@ -1498,7 +1503,7 @@ public class UpgradeUtil {
             }
             throw new SQLException(buf.toString());
         }
-        PhoenixConnection upgradeConn = new PhoenixConnection(conn, true);
+        PhoenixConnection upgradeConn = new PhoenixConnection(conn, true, true);
         try {
             upgradeConn.setAutoCommit(true);
             for (PTable table : tablesNeedingUpgrading) {
@@ -1898,4 +1903,12 @@ public class UpgradeUtil {
         String upgradingFrom = getVersion(currentSystemTableTimestamp);
         return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
     }
+    
+    public static boolean isNoUpgradeSet(Properties props) {
+        return Boolean.compare(true, Boolean.valueOf(props.getProperty(DO_NOT_UPGRADE))) == 0;
+    }
+    
+    public static void doNotUpgradeOnFirstConnection(Properties props) {
+        props.setProperty(DO_NOT_UPGRADE, String.valueOf(true));
+    }
 }
\ No newline at end of file