You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/09/09 02:42:07 UTC
phoenix git commit: PHOENIX-3230 Upgrade code running concurrently on
different JVMs could make clients unusuable
Repository: phoenix
Updated Branches:
refs/heads/4.8-HBase-0.98 f754ea525 -> 0fddad0f3
PHOENIX-3230 Upgrade code running concurrently on different JVMs could make clients unusuable
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0fddad0f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0fddad0f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0fddad0f
Branch: refs/heads/4.8-HBase-0.98
Commit: 0fddad0f3dfad8e601c61ba97c1f1de3e3a5e054
Parents: f754ea5
Author: Samarth <sa...@salesforce.com>
Authored: Thu Sep 8 19:40:45 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Sep 8 19:41:29 2016 -0700
----------------------------------------------------------------------
.../phoenix/coprocessor/MetaDataProtocol.java | 20 +-
.../phoenix/exception/SQLExceptionCode.java | 1 +
.../query/ConnectionQueryServicesImpl.java | 240 ++++++++++++-------
.../apache/phoenix/query/QueryConstants.java | 1 +
.../org/apache/phoenix/util/UpgradeUtil.java | 5 +-
5 files changed, 166 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fddad0f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index dce89bd..20922e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -19,9 +19,9 @@ package org.apache.phoenix.coprocessor;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
@@ -29,7 +29,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.schema.PColumn;
@@ -64,7 +63,7 @@ import com.google.protobuf.ByteString;
public abstract class MetaDataProtocol extends MetaDataService {
public static final int PHOENIX_MAJOR_VERSION = 4;
public static final int PHOENIX_MINOR_VERSION = 8;
- public static final int PHOENIX_PATCH_NUMBER = 0;
+ public static final int PHOENIX_PATCH_NUMBER = 1;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
@@ -89,7 +88,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
// ALWAYS update this map whenever rolling out a new release (major, minor or patch release).
// Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.
- public static final Map<Long, String> TIMESTAMP_VERSION_MAP = new HashMap<>(10);
+ private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = new TreeMap<>();
static {
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0, "4.1.x");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0, "4.2.0");
@@ -100,6 +99,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x");
}
+
public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER;
// TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
@@ -401,4 +401,14 @@ public abstract class MetaDataProtocol extends MetaDataService {
return schema;
}
}
+
+ public static String getVersion(long serverTimestamp) {
+ /*
+ * It is possible that when clients are trying to run upgrades concurrently, we could be at an intermediate
+ * server timestamp. Using floorKey provides us a range based lookup where the timestamp range for a release is
+ * [timeStampForRelease, timestampForNextRelease).
+ */
+ String version = TIMESTAMP_VERSION_MAP.get(TIMESTAMP_VERSION_MAP.floorKey(serverTimestamp));
+ return version;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fddad0f/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5a8fffa..0ccecae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -370,6 +370,7 @@ public enum SQLExceptionCode {
OUTDATED_JARS(2007, "INT09", "Outdated jars."),
INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code."),
+ CONCURRENT_UPGRADE_IN_PROGRESS(2010, "INT12", ""),
OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() {
@Override
public SQLException newException(SQLExceptionInfo info) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fddad0f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c47b420..3be376c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -19,10 +19,13 @@ package org.apache.phoenix.query;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
@@ -75,9 +78,11 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@@ -206,6 +211,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
@@ -2322,140 +2328,141 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (upgradeSystemTables) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
sysCatalogTableName = e.getTable().getPhysicalName().getString();
- if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
+ if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp)) {
snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
createSnapshot(snapshotName, sysCatalogTableName);
}
- String columnsToAdd = "";
- // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
- // any new columns we've added.
- if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
- // We know that we always need to add the STORE_NULLS column for 4.3 release
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
- try (HBaseAdmin admin = getAdmin()) {
- HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
- for (HTableDescriptor table : localIndexTables) {
- if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
- && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
- table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
+ String columnsToAdd = "";
+ // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
+ // any new columns we've added.
+ if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+ // We know that we always need to add the STORE_NULLS column for 4.3 release
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
+ try (HBaseAdmin admin = getAdmin()) {
+ HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
+ for (HTableDescriptor table : localIndexTables) {
+ if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
+ && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
+ table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
MetaDataUtil.getUserTableName(table
- .getNameAsString()));
- // Explicitly disable, modify and enable the table to ensure co-location of data
- // and index regions. If we just modify the table descriptor when online schema
- // change enabled may reopen the region in same region server instead of following data region.
- admin.disableTable(table.getTableName());
- admin.modifyTable(table.getTableName(), table);
- admin.enableTable(table.getTableName());
- }
+ .getNameAsString()));
+ // Explicitly disable, modify and enable the table to ensure co-location of data
+ // and index regions. If we just modify the table descriptor when online schema
+ // change enabled may reopen the region in same region server instead of following data region.
+ admin.disableTable(table.getTableName());
+ admin.modifyTable(table.getTableName(), table);
+ admin.enableTable(table.getTableName());
}
}
}
+ }
- // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
- // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
- // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
- // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
- // the column names that have been added to SYSTEM.CATALOG since 4.0.
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
- columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
+ // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
+ // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
+ // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+ // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+ // the column names that have been added to SYSTEM.CATALOG since 4.0.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+ columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
+ ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName());
- }
+ }
- // If we have some new columns from 4.1-4.3 to add, add them now.
- if (!columnsToAdd.isEmpty()) {
- // Ugh..need to assign to another local variable to keep eclipse happy.
- PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+ // If we have some new columns from 4.1-4.3 to add, add them now.
+ if (!columnsToAdd.isEmpty()) {
+ // Ugh..need to assign to another local variable to keep eclipse happy.
+ PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
- metaConnection = newMetaConnection;
- }
+ metaConnection = newMetaConnection;
+ }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
- columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
- + PInteger.INSTANCE.getSqlTypeName();
- try {
- metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+ + PInteger.INSTANCE.getSqlTypeName();
+ try {
+ metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
- upgradeTo4_5_0(metaConnection);
- } catch (ColumnAlreadyExistsException ignored) {
- /*
- * Upgrade to 4.5 is a slightly special case. We use the fact that the column
- * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
- * the server side upgrade has finished or is in progress.
- */
- logger.debug("No need to run 4.5 upgrade");
+ upgradeTo4_5_0(metaConnection);
+ } catch (ColumnAlreadyExistsException ignored) {
+ /*
+ * Upgrade to 4.5 is a slightly special case. We use the fact that the column
+ * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
+ * the server side upgrade has finished or is in progress.
+ */
+ logger.debug("No need to run 4.5 upgrade");
+ }
+ Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+ props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
+ try {
+ List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
+ if (!tablesNeedingUpgrade.isEmpty()) {
+ logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
}
- Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
- props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
- props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
- try {
- List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
- if (!tablesNeedingUpgrade.isEmpty()) {
- logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
- }
- List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
- if (!unsupportedTables.isEmpty()) {
- logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables));
- }
- } catch (Exception ex) {
- logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
- } finally {
- conn.close();
+ List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
+ if (!unsupportedTables.isEmpty()) {
+ logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables));
}
+ } catch (Exception ex) {
+ logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
+ } finally {
+ conn.close();
}
- // Add these columns one at a time, each with different timestamps so that if folks have
- // run the upgrade code already for a snapshot, we'll still enter this block (and do the
- // parts we haven't yet done).
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
- columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ }
+ // Add these columns one at a time, each with different timestamps so that if folks have
+ // run the upgrade code already for a snapshot, we'll still enter this block (and do the
+ // parts we haven't yet done).
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
+ metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
- }
- if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
- // Drop old stats table so that new stats table is created
- metaConnection = dropStatsTable(metaConnection,
+ }
+ if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+ // Drop old stats table so that new stats table is created
+ metaConnection = dropStatsTable(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName());
- metaConnection = setImmutableTableIndexesImmutable(metaConnection,
+ metaConnection = setImmutableTableIndexesImmutable(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
- metaConnection = updateSystemCatalogTimestamp(metaConnection,
+ metaConnection = updateSystemCatalogTimestamp(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
- clearCache();
- }
+ ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+ clearCache();
+ }
- if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
- metaConnection = addColumnsIfNotExists(metaConnection,
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+ metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+ PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection,
+ metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+ PVarchar.INSTANCE.getSqlTypeName());
- metaConnection = addColumnsIfNotExists(metaConnection,
+ metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+ PBoolean.INSTANCE.getSqlTypeName());
- metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
- if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
+ metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
+ if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
- metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
- }
- ConnectionQueryServicesImpl.this.removeTable(null,
+ metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
+ }
+ ConnectionQueryServicesImpl.this.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
- clearCache();
- }
+ clearCache();
+ }
+
}
}
@@ -2535,6 +2542,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
success = true;
scheduleRenewLeaseTasks();
+ } catch (UpgradeInProgressException e) {
+ // don't set it as initializationException because otherwise client won't be able to retry
+ throw e;
} catch (Exception e) {
if (e instanceof SQLException) {
initializationException = (SQLException)e;
@@ -2710,12 +2720,56 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
+
+ /**
+ * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
+ * making use of HBase's checkAndPut api.
+ * <p>
+ * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the
+ * version cell will be null i.e. the QueryConstants#VERSION column will be non-existent. For client's
+ * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which
+ * wins the race will end up setting the version cell to the MetadataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP
+ * for the release.
+ * </p>
+ *
+ * @return true if client won the race, false otherwise
+ * @throws IOException
+ * @throws SQLException
+ */
+ private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp) throws IOException,
+ SQLException {
+ Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
+ try (HTableInterface sysCatalogTable = getTable(SYSTEM_CATALOG_NAME_BYTES)) {
+ byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+ byte[] qualifier = QueryConstants.VERSION;
+ byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 ? null
+ : Bytes.toBytes(currentServerSideTableTimestamp);
+ byte[] newValue = Bytes.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
+ long ts = MIN_SYSTEM_TABLE_TIMESTAMP;
+ Put put = new Put(row, ts);
+ put.add(family, qualifier, newValue);
+ boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put);
+ if (!acquired) { throw new UpgradeInProgressException(
+ getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+ return true;
+ }
+ }
});
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SQLException.class);
throw Throwables.propagate(e);
}
}
+
+ private static class UpgradeInProgressException extends SQLException {
+ public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
+ super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
+ + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
+ .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
+ }
+ }
private List<String> getTableNames(List<HTableDescriptor> tables) {
List<String> tableNames = new ArrayList<String>(4);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fddad0f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9f8f58c..3077943 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -356,5 +356,6 @@ public interface QueryConstants {
public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
public static final String LAST_SCAN = "LAST_SCAN";
+ public static final byte[] VERSION = "VERSION".getBytes();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fddad0f/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index b2e8bbc..b205c4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION;
-import static org.apache.phoenix.coprocessor.MetaDataProtocol.TIMESTAMP_VERSION_MAP;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
@@ -31,7 +31,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
@@ -1898,7 +1897,7 @@ public class UpgradeUtil {
public static final String getUpgradeSnapshotName(String tableString, long currentSystemTableTimestamp) {
Format formatter = new SimpleDateFormat("yyyyMMddHHmmssZ");
String date = formatter.format(new Date(System.currentTimeMillis()));
- String upgradingFrom = TIMESTAMP_VERSION_MAP.get(currentSystemTableTimestamp);
+ String upgradingFrom = getVersion(currentSystemTableTimestamp);
return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
}
}
\ No newline at end of file