You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/02 21:20:52 UTC
phoenix git commit: Port snapshot and upgrade mutex fixes to 4.8
branches
Repository: phoenix
Updated Branches:
refs/heads/4.8-HBase-0.98 da00849cd -> 0b56aa9f6
Port snapshot and upgrade mutex fixes to 4.8 branches
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0b56aa9f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0b56aa9f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0b56aa9f
Branch: refs/heads/4.8-HBase-0.98
Commit: 0b56aa9f6b6c8941e3751bf41a34ba7707a0847f
Parents: da00849
Author: Samarth <sa...@salesforce.com>
Authored: Wed Nov 2 14:17:20 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Wed Nov 2 14:17:20 2016 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/UpgradeIT.java | 101 ++++++++++++++++
.../phoenix/coprocessor/MetaDataProtocol.java | 2 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 5 +
.../query/ConnectionQueryServicesImpl.java | 121 ++++++++++++-------
.../apache/phoenix/query/QueryConstants.java | 1 -
.../org/apache/phoenix/util/UpgradeUtil.java | 5 +-
6 files changed, 189 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 6722b67..2059c78 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -24,7 +24,9 @@ import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_
import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
@@ -33,6 +35,11 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -41,8 +48,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl.UpgradeInProgressException;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PName;
@@ -556,7 +567,97 @@ public class UpgradeIT extends BaseHBaseManagedTimeIT {
}
}
}
+
+ @Test
+ public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
+ ConnectionQueryServices services = null;
+ byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ generateRandomString());
+ try (Connection conn = getConnection(false, null)) {
+ services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ assertTrue(((ConnectionQueryServicesImpl)services)
+ .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey));
+ try {
+ ((ConnectionQueryServicesImpl)services)
+ .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
+ fail();
+ } catch (UpgradeInProgressException expected) {
+ }
+ assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
+ assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
+ }
+ }
+
+ @Test
+ public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception {
+ final AtomicBoolean mutexStatus1 = new AtomicBoolean(false);
+ final AtomicBoolean mutexStatus2 = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(2);
+ final AtomicInteger numExceptions = new AtomicInteger(0);
+ ConnectionQueryServices services = null;
+ try (Connection conn = getConnection(false, null)) {
+ services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ final byte[] mutexKey = Bytes.toBytes(generateRandomString());
+ FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
+ FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
+ Thread t1 = new Thread(task1);
+ t1.setDaemon(true);
+ Thread t2 = new Thread(task2);
+ t2.setDaemon(true);
+ t1.start();
+ t2.start();
+ latch.await();
+ // make sure tasks didn't fail by calling get()
+ task1.get();
+ task2.get();
+ assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get());
+ assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(),
+ mutexStatus2.get());
+ assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get());
+ } finally {
+ if (services != null) {
+ releaseUpgradeMutex(services);
+ }
+ }
+ }
+
+ private void releaseUpgradeMutex(ConnectionQueryServices services) {
+ byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+ ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey);
+
+ }
+
+ private static class AcquireMutexRunnable implements Callable<Void> {
+
+ private final AtomicBoolean acquireStatus;
+ private final ConnectionQueryServices services;
+ private final CountDownLatch latch;
+ private final AtomicInteger numExceptions;
+ private final byte[] mutexRowKey;
+ public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions, byte[] mutexKey) {
+ this.acquireStatus = acquireStatus;
+ this.services = services;
+ this.latch = latch;
+ this.numExceptions = numExceptions;
+ this.mutexRowKey = mutexKey;
+ }
+ @Override
+ public Void call() throws Exception {
+ try {
+ ((ConnectionQueryServicesImpl)services).acquireUpgradeMutex(
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
+ acquireStatus.set(true);
+ } catch (UpgradeInProgressException e) {
+ numExceptions.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ return null;
+ }
+
+ }
private Connection createTenantConnection(String tenantId) throws SQLException {
Properties props = new Properties();
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 3b57981..3c3267f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -63,7 +63,7 @@ import com.google.protobuf.ByteString;
public abstract class MetaDataProtocol extends MetaDataService {
public static final int PHOENIX_MAJOR_VERSION = 4;
public static final int PHOENIX_MINOR_VERSION = 8;
- public static final int PHOENIX_PATCH_NUMBER = 1;
+ public static final int PHOENIX_PATCH_NUMBER = 2;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 17c2a1c..3d2347b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -315,6 +315,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
/** Version below which we fall back on the generic KeyValueBuilder */
public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
+ public static final String SYSTEM_MUTEX_TABLE_NAME = "MUTEX";
+ public static final String SYSTEM_MUTEX_NAME = SchemaUtil.getTableName(QueryConstants.SYSTEM_SCHEMA_NAME, SYSTEM_MUTEX_TABLE_NAME);
+ public static final byte[] SYSTEM_MUTEX_NAME_BYTES = Bytes.toBytes(SYSTEM_MUTEX_NAME);
+ public static final byte[] SYSTEM_MUTEX_FAMILY_NAME_BYTES = TABLE_FAMILY_BYTES;
+
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
this.connection = connection;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3dbbf09..ac4cf5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,7 +32,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
-import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName;
+import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
import java.io.IOException;
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -83,8 +84,10 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
@@ -224,6 +227,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
+ private static final int TTL_FOR_MUTEX = 15 * 60; // 15min
protected final Configuration config;
private final ConnectionInfo connectionInfo;
// Copy of config.getProps(), but read-only to prevent synchronization that we
@@ -267,6 +271,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
private ScheduledExecutorService renewLeaseExecutor;
private final boolean renewLeaseEnabled;
+ private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
+ private static final byte[] UPGRADE_MUTEX_VALUE = UPGRADE_MUTEX;
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
@@ -2312,6 +2318,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
String snapshotName = null;
String sysCatalogTableName = null;
boolean hConnectionEstablished = false;
+ boolean acquiredMutex = false;
+ byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+ boolean snapshotCreated = false;
try {
openConnection();
hConnectionEstablished = true;
@@ -2353,9 +2363,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (upgradeSystemTables) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
sysCatalogTableName = e.getTable().getPhysicalName().getString();
- if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable().getPhysicalName().getBytes())) {
- snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
+ if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP
+ && (acquiredMutex = acquireUpgradeMutex(currentServerSideTableTimeStamp, e
+ .getTable().getPhysicalName().getBytes()))) {
+ snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
createSnapshot(snapshotName, sysCatalogTableName);
+ snapshotCreated = true;
}
String columnsToAdd = "";
// This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
@@ -2588,7 +2601,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
} finally {
try {
- restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+ if (snapshotCreated) {
+ restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
+ }
} catch (SQLException e) {
if (initializationException != null) {
initializationException.setNextException(e);
@@ -2609,6 +2624,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
} finally {
initialized = true;
+ if (acquiredMutex) {
+ releaseUpgradeMutex(mutexRowKey);
+ }
if (initializationException != null) {
throw initializationException;
}
@@ -2756,43 +2774,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
- /**
- * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
- * making use of HBase's checkAndPut api.
- * <p>
- * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
- * version cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
- * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which
- * wins the race will end up setting the version cell to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
- * for the release.
- * </p>
- *
- * @return true if client won the race, false otherwise
- * @throws IOException
- * @throws SQLException
- */
- private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
- SQLException {
- Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
- try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) {
- byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
- byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
- byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
- byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
- : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
- byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
- // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used
- // to calculate SYSTEM.CATALOG's server side timestamp.
- Put put = new Put(row);
- put.add(family, qualifier, newValue);
- boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
- if (!acquired) { throw new UpgradeInProgressException(
- getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
- return true;
- }
- }
});
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SQLException.class);
@@ -2800,7 +2781,63 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- private static class UpgradeInProgressException extends SQLException {
+ /**
+ * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
+ * making use of HBase's checkAndPut api.
+ *
+ * @return true if client won the race, false otherwise
+ * @throws IOException
+ * @throws SQLException
+ */
+ @VisibleForTesting
+ public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException,
+ SQLException {
+ Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
+ try (HBaseAdmin admin = getAdmin()) {
+ try {
+ HTableDescriptor tableDesc = new HTableDescriptor(
+ TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES));
+ HColumnDescriptor columnDesc = new HColumnDescriptor(
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
+ columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time
+ tableDesc.addFamily(columnDesc);
+ admin.createTable(tableDesc);
+ } catch (TableExistsException e) {
+ // Ignore
+ }
+ }
+ try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+ byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+ byte[] qualifier = UPGRADE_MUTEX;
+ byte[] oldValue = null;
+ byte[] newValue = UPGRADE_MUTEX_VALUE;
+ Put put = new Put(rowToLock);
+ put.add(family, qualifier, newValue);
+ boolean acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, oldValue, put);
+ if (!acquired) { throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
+ getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+ return true;
+ }
+ }
+
+ @VisibleForTesting
+ public boolean releaseUpgradeMutex(byte[] mutexRowKey) {
+ boolean released = false;
+ try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+ byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+ byte[] qualifier = UPGRADE_MUTEX;
+ byte[] expectedValue = UPGRADE_MUTEX_VALUE;
+ Delete delete = new Delete(mutexRowKey);
+ RowMutations mutations = new RowMutations(mutexRowKey);
+ mutations.add(delete);
+ released = sysMutexTable.checkAndMutate(mutexRowKey, family, qualifier, CompareOp.EQUAL, expectedValue, mutations);
+ } catch (Exception e) {
+ logger.warn("Release of upgrade mutex failed", e);
+ }
+ return released;
+ }
+
+ public static class UpgradeInProgressException extends SQLException {
public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
+ ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ace228b..b3a5a36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -362,6 +362,5 @@ public interface QueryConstants {
public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
public static final String LAST_SCAN = "LAST_SCAN";
- public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b56aa9f/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index b205c4a..9b19448 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -1894,8 +1894,9 @@ public class UpgradeUtil {
}
}
- public static final String getUpgradeSnapshotName(String tableString, long currentSystemTableTimestamp) {
- Format formatter = new SimpleDateFormat("yyyyMMddHHmmssZ");
+ public static final String getSysCatalogSnapshotName(long currentSystemTableTimestamp) {
+ String tableString = SYSTEM_CATALOG_NAME;
+ Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");
String date = formatter.format(new Date(System.currentTimeMillis()));
String upgradingFrom = getVersion(currentSystemTableTimestamp);
return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;