You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/02 21:20:52 UTC

phoenix git commit: Port snapshot and upgrade mutex fixes to 4.8 branches

Repository: phoenix
Updated Branches:
  refs/heads/4.8-HBase-0.98 da00849cd -> 0b56aa9f6


Port snapshot and upgrade mutex fixes to 4.8 branches


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0b56aa9f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0b56aa9f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0b56aa9f

Branch: refs/heads/4.8-HBase-0.98
Commit: 0b56aa9f6b6c8941e3751bf41a34ba7707a0847f
Parents: da00849
Author: Samarth <sa...@salesforce.com>
Authored: Wed Nov 2 14:17:20 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Wed Nov 2 14:17:20 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/UpgradeIT.java   | 101 ++++++++++++++++
 .../phoenix/coprocessor/MetaDataProtocol.java   |   2 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   5 +
 .../query/ConnectionQueryServicesImpl.java      | 121 ++++++++++++-------
 .../apache/phoenix/query/QueryConstants.java    |   1 -
 .../org/apache/phoenix/util/UpgradeUtil.java    |   5 +-
 6 files changed, 189 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 6722b67..2059c78 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -24,7 +24,9 @@ import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_
 import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -33,6 +35,11 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -41,8 +48,12 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl.UpgradeInProgressException;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PName;
@@ -556,7 +567,97 @@ public class UpgradeIT extends BaseHBaseManagedTimeIT {
             }
         }
     }
+    
+    @Test
+    public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
+        ConnectionQueryServices services = null;
+        byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                generateRandomString());
+        try (Connection conn = getConnection(false, null)) {
+            services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            assertTrue(((ConnectionQueryServicesImpl)services)
+                    .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey));
+            try {
+                ((ConnectionQueryServicesImpl)services)
+                        .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
+                fail();
+            } catch (UpgradeInProgressException expected) {
 
+            }
+            assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
+            assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
+        }
+    }
+    
+    @Test
+    public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception {
+        final AtomicBoolean mutexStatus1 = new AtomicBoolean(false);
+        final AtomicBoolean mutexStatus2 = new AtomicBoolean(false);
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicInteger numExceptions = new AtomicInteger(0);
+        ConnectionQueryServices services = null;
+        try (Connection conn = getConnection(false, null)) {
+            services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            final byte[] mutexKey = Bytes.toBytes(generateRandomString());
+            FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
+            FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
+            Thread t1 = new Thread(task1);
+            t1.setDaemon(true);
+            Thread t2 = new Thread(task2);
+            t2.setDaemon(true);
+            t1.start();
+            t2.start();
+            latch.await();
+            // make sure tasks didn't fail by calling get()
+            task1.get();
+            task2.get();
+            assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get());
+            assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(),
+                    mutexStatus2.get());
+            assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get());
+        } finally {
+            if (services != null) {
+                releaseUpgradeMutex(services);
+            }
+        }
+    }
+    
+    private void releaseUpgradeMutex(ConnectionQueryServices services) {
+        byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+        ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey);
+        
+    }
+    
+    private static class AcquireMutexRunnable implements Callable<Void> {
+        
+        private final AtomicBoolean acquireStatus;
+        private final ConnectionQueryServices services;
+        private final CountDownLatch latch;
+        private final AtomicInteger numExceptions;
+        private final byte[] mutexRowKey;
+        public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions, byte[] mutexKey) {
+            this.acquireStatus = acquireStatus;
+            this.services = services;
+            this.latch = latch;
+            this.numExceptions = numExceptions;
+            this.mutexRowKey = mutexKey;
+        }
+        @Override
+        public Void call() throws Exception {
+            try {
+                ((ConnectionQueryServicesImpl)services).acquireUpgradeMutex(
+                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
+                acquireStatus.set(true);
+            } catch (UpgradeInProgressException e) {
+                numExceptions.incrementAndGet();
+            } finally {
+                latch.countDown();
+            }
+            return null;
+        }
+        
+    }
     private Connection createTenantConnection(String tenantId) throws SQLException {
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 3b57981..3c3267f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -63,7 +63,7 @@ import com.google.protobuf.ByteString;
 public abstract class MetaDataProtocol extends MetaDataService {
     public static final int PHOENIX_MAJOR_VERSION = 4;
     public static final int PHOENIX_MINOR_VERSION = 8;
-    public static final int PHOENIX_PATCH_NUMBER = 1;
+    public static final int PHOENIX_PATCH_NUMBER = 2;
     public static final int PHOENIX_VERSION =
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 17c2a1c..3d2347b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -315,6 +315,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String SYSTEM_MUTEX_TABLE_NAME = "MUTEX";
+    public static final String SYSTEM_MUTEX_NAME = SchemaUtil.getTableName(QueryConstants.SYSTEM_SCHEMA_NAME, SYSTEM_MUTEX_TABLE_NAME);
+    public static final byte[] SYSTEM_MUTEX_NAME_BYTES = Bytes.toBytes(SYSTEM_MUTEX_NAME);
+    public static final byte[] SYSTEM_MUTEX_FAMILY_NAME_BYTES = TABLE_FAMILY_BYTES;
+    
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3dbbf09..ac4cf5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,7 +32,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
-import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName;
+import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -83,8 +84,10 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
@@ -224,6 +227,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
     private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
     private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
+    private static final int TTL_FOR_MUTEX = 15 * 60; // 15min 
     protected final Configuration config;
     private final ConnectionInfo connectionInfo;
     // Copy of config.getProps(), but read-only to prevent synchronization that we
@@ -267,6 +271,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
     private ScheduledExecutorService renewLeaseExecutor;
     private final boolean renewLeaseEnabled;
+    private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
+    private static final byte[] UPGRADE_MUTEX_VALUE = UPGRADE_MUTEX;
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -2312,6 +2318,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         String snapshotName = null;
                         String sysCatalogTableName = null;
                         boolean hConnectionEstablished = false;
+                        boolean acquiredMutex = false;
+                        byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+                        boolean snapshotCreated = false;
                         try {
                             openConnection();
                             hConnectionEstablished = true;
@@ -2353,9 +2363,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 if (upgradeSystemTables) {
                                     long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
                                     sysCatalogTableName = e.getTable().getPhysicalName().getString();
-                                    if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable().getPhysicalName().getBytes())) {
-                                        snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
+                                    if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP
+                                            && (acquiredMutex = acquireUpgradeMutex(currentServerSideTableTimeStamp, e
+                                                    .getTable().getPhysicalName().getBytes()))) {
+                                        snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
                                         createSnapshot(snapshotName, sysCatalogTableName);
+                                        snapshotCreated = true;
                                     }
                                     String columnsToAdd = "";
                                     // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
@@ -2588,7 +2601,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 }
                             } finally {
                                 try {
-                                    restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+                                    if (snapshotCreated) {
+                                        restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+                                    }
                                 } catch (SQLException e) {
                                     if (initializationException != null) {
                                         initializationException.setNextException(e);
@@ -2609,6 +2624,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                         }
                                     } finally {
                                         initialized = true;
+                                        if (acquiredMutex) {
+                                            releaseUpgradeMutex(mutexRowKey);
+                                        }
                                         if (initializationException != null) {
                                             throw initializationException;
                                         }
@@ -2756,43 +2774,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         }
                     }
                 }
-                
-                /**
-                 * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
-                 * making use of HBase's checkAndPut api.
-                 * <p>
-                 * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
-                 * version cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
-                 * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which
-                 * wins the race will end up setting the version cell to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
-                 * for the release.
-                 * </p>
-                 * 
-                 * @return true if client won the race, false otherwise
-                 * @throws IOException
-                 * @throws SQLException
-                 */
-                private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
-                        SQLException {
-                    Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
-                    try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
-                        byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
-                        byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-                        byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
-                        byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
-                                : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
-                        byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
-                        // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
-                        // to calculate SYSTEM.CATALOG's server side timestamp.
-                        Put put = new Put(row);
-                        put.add(family, qualifier, newValue);
-                        boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
-                        if (!acquired) { throw new UpgradeInProgressException(
-                                getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
-                        return true;
-                    }
-                }
             });
         } catch (Exception e) {
             Throwables.propagateIfInstanceOf(e, SQLException.class);
@@ -2800,7 +2781,63 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
     
-    private static class UpgradeInProgressException extends SQLException {
+    /**
+     * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
+     * making use of HBase's checkAndPut api.
+     * 
+     * @return true if client won the race, false otherwise
+     * @throws IOException
+     * @throws SQLException
+     */
+    @VisibleForTesting
+    public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException,
+            SQLException {
+        Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
+        try (HBaseAdmin admin = getAdmin()) {
+            try {
+                HTableDescriptor tableDesc = new HTableDescriptor(
+                        TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES));
+                HColumnDescriptor columnDesc = new HColumnDescriptor(
+                        PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
+                columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time
+                tableDesc.addFamily(columnDesc);
+                admin.createTable(tableDesc);
+            } catch (TableExistsException e) {
+                // Ignore
+            }
+        }
+        try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+            byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+            byte[] qualifier = UPGRADE_MUTEX;
+            byte[] oldValue = null;
+            byte[] newValue = UPGRADE_MUTEX_VALUE;
+            Put put = new Put(rowToLock);
+            put.add(family, qualifier, newValue);
+            boolean acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, oldValue, put);
+            if (!acquired) { throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
+                    getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+            return true;
+        }
+    }
+    
+    @VisibleForTesting
+    public boolean releaseUpgradeMutex(byte[] mutexRowKey) {
+        boolean released = false;
+        try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+            byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+            byte[] qualifier = UPGRADE_MUTEX;
+            byte[] expectedValue = UPGRADE_MUTEX_VALUE;
+            Delete delete = new Delete(mutexRowKey);
+            RowMutations mutations = new RowMutations(mutexRowKey);
+            mutations.add(delete);
+            released = sysMutexTable.checkAndMutate(mutexRowKey, family, qualifier, CompareOp.EQUAL, expectedValue, mutations);
+        } catch (Exception e) {
+            logger.warn("Release of upgrade mutex failed", e);
+        }
+        return released;
+    }
+    
+    public static class UpgradeInProgressException extends SQLException {
         public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
             super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
                     + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ace228b..b3a5a36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -362,6 +362,5 @@ public interface QueryConstants {
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
-    public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index b205c4a..9b19448 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -1894,8 +1894,9 @@ public class UpgradeUtil {
         }
     }
 
-    public static final String getUpgradeSnapshotName(String tableString, long currentSystemTableTimestamp) {
-        Format formatter = new SimpleDateFormat("yyyyMMddHHmmssZ");
+    public static final String getSysCatalogSnapshotName(long currentSystemTableTimestamp) {
+        String tableString = SYSTEM_CATALOG_NAME;
+        Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");
         String date = formatter.format(new Date(System.currentTimeMillis()));
         String upgradingFrom = getVersion(currentSystemTableTimestamp);
         return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;