You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:40:51 UTC
[16/50] [abbrv] phoenix git commit: PHOENIX-1641 Make the upgrade
from 4.x to 4.3 work for SYSTEM.CATALOG and SYSTEM.SEQUENCE
PHOENIX-1641 Make the upgrade from 4.x to 4.3 work for SYSTEM.CATALOG and SYSTEM.SEQUENCE
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eaa7fbfd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eaa7fbfd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eaa7fbfd
Branch: refs/heads/calcite
Commit: eaa7fbfde9f45c635cc11d524977d54485c9ea8d
Parents: 9db37bd
Author: Samarth <sa...@salesforce.com>
Authored: Fri Feb 6 16:29:09 2015 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Feb 6 16:29:09 2015 -0800
----------------------------------------------------------------------
.../query/ConnectionQueryServicesImpl.java | 69 +++++--
.../org/apache/phoenix/util/UpgradeUtil.java | 179 ++++++++++---------
2 files changed, 146 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6d58f57..7763a0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -136,6 +136,7 @@ import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
@@ -1757,8 +1758,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
- // Keeping this to use for further upgrades
- protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
+ /**
+ * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
+ */
+ private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
String tableName, long timestamp, String columns) throws SQLException {
Properties props = new Properties(oldMetaConnection.getClientInfo());
@@ -1826,7 +1829,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
- } catch (TableAlreadyExistsException ignore) {
+ } catch (TableAlreadyExistsException e) {
+ // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
+ // any new columns we've added.
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+
+ // We know that we always need to add the STORE_NULLS column for 4.3 release
+ String columnsToAdd = PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
+
+ // If the server side schema is 4 versions behind then we need to add INDEX_TYPE
+ // and INDEX_DISABLE_TIMESTAMP columns too.
+ // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+ // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+ // the column names that have been added to SYSTEM.CATALOG since 4.0.
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 4) {
+ columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
+ + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName();
+ }
+
+ // Ugh..need to assign to another local variable to keep eclipse happy.
+ PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+ metaConnection = newMetaConnection;
}
int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
@@ -1840,20 +1865,34 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Integer sequenceSaltBuckets = e.getTable().getBucketNum();
nSequenceSaltBuckets = sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets;
} catch (TableAlreadyExistsException e) {
- // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
+ // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include
// any new columns we've added.
- if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
- metaConnection.removeTable(null,
- PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
- PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
- PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
- PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+ long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+ // if the table is at a timestamp corresponding to before 4.2.1 then run the upgrade script
+ if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 2) {
+ if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
+ metaConnection.removeTable(null,
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
+ PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
+ PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+ }
+ nSequenceSaltBuckets = nSaltBuckets;
+ }
+ if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) {
+ // If the table time stamp is before 4.1.0 then we need to add below columns
+ // to the SYSTEM.SEQUENCE table.
+ String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
+ + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
+ + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName()
+ + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName();
+ addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
}
- nSequenceSaltBuckets = nSaltBuckets;
}
try {
metaConnection.createStatement().executeUpdate(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index a3fee72..a92223b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -242,106 +242,110 @@ public class UpgradeUtil {
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
+
+ // if the SYSTEM.SEQUENCE table is for 4.1.0 or before then we need to salt the table
+ if (oldTable.getTimeStamp() <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) {
+ int batchSizeBytes = 100 * 1024; // 100K chunks
+ int sizeBytes = 0;
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
- int batchSizeBytes = 100 * 1024; // 100K chunks
- int sizeBytes = 0;
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
-
- boolean success = false;
- Scan scan = new Scan();
- scan.setRaw(true);
- scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
- HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
- try {
- boolean committed = false;
- logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
- ResultScanner scanner = seqTable.getScanner(scan);
+ boolean success = false;
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+ HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
- Result result;
- while ((result = scanner.next()) != null) {
- for (KeyValue keyValue : result.raw()) {
- KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets);
- if (newKeyValue != null) {
- sizeBytes += newKeyValue.getLength();
- if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
- // Delete old value
- byte[] buf = keyValue.getBuffer();
- Delete delete = new Delete(keyValue.getRow());
- KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
- buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
- buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
- keyValue.getTimestamp(), KeyValue.Type.Delete,
- ByteUtil.EMPTY_BYTE_ARRAY,0,0);
- delete.addDeleteMarker(deleteKeyValue);
- mutations.add(delete);
- sizeBytes += deleteKeyValue.getLength();
- // Put new value
- Put put = new Put(newKeyValue.getRow());
- put.add(newKeyValue);
- mutations.add(put);
- } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
- // Copy delete marker using new key so that it continues
- // to delete the key value preceding it that will be updated
- // as well.
- Delete delete = new Delete(newKeyValue.getRow());
- delete.addDeleteMarker(newKeyValue);
- mutations.add(delete);
+ boolean committed = false;
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ ResultScanner scanner = seqTable.getScanner(scan);
+ try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ for (KeyValue keyValue : result.raw()) {
+ KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets);
+ if (newKeyValue != null) {
+ sizeBytes += newKeyValue.getLength();
+ if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
+ // Delete old value
+ byte[] buf = keyValue.getBuffer();
+ Delete delete = new Delete(keyValue.getRow());
+ KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
+ buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+ buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+ keyValue.getTimestamp(), KeyValue.Type.Delete,
+ ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+ delete.addDeleteMarker(deleteKeyValue);
+ mutations.add(delete);
+ sizeBytes += deleteKeyValue.getLength();
+ // Put new value
+ Put put = new Put(newKeyValue.getRow());
+ put.add(newKeyValue);
+ mutations.add(put);
+ } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+ // Copy delete marker using new key so that it continues
+ // to delete the key value preceding it that will be updated
+ // as well.
+ Delete delete = new Delete(newKeyValue.getRow());
+ delete.addDeleteMarker(newKeyValue);
+ mutations.add(delete);
+ }
+ }
+ if (sizeBytes >= batchSizeBytes) {
+ logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ mutations.clear();
+ sizeBytes = 0;
+ committed = true;
}
- }
- if (sizeBytes >= batchSizeBytes) {
- logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
- seqTable.batch(mutations);
- mutations.clear();
- sizeBytes = 0;
- committed = true;
}
}
- }
- if (!mutations.isEmpty()) {
- logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
- seqTable.batch(mutations);
- }
- preSplitSequenceTable(conn, nSaltBuckets);
- logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
- success = true;
- return true;
- } catch (InterruptedException e) {
- throw ServerUtil.parseServerException(e);
- } finally {
- try {
- scanner.close();
+ if (!mutations.isEmpty()) {
+ logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ }
+ preSplitSequenceTable(conn, nSaltBuckets);
+ logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
+ success = true;
+ return true;
+ } catch (InterruptedException e) {
+ throw ServerUtil.parseServerException(e);
} finally {
- if (!success) {
- if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
- // Don't use Delete here as we'd never be able to change it again at this timestamp.
- KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey,
- PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
- PInteger.INSTANCE.toBytes(0));
- Put unsaltPut = new Put(seqTableKey);
- unsaltPut.add(unsaltKV);
- try {
- sysTable.put(unsaltPut);
- success = true;
- } finally {
- if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ try {
+ scanner.close();
+ } finally {
+ if (!success) {
+ if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
+ // Don't use Delete here as we'd never be able to change it again at this timestamp.
+ KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PInteger.INSTANCE.toBytes(0));
+ Put unsaltPut = new Put(seqTableKey);
+ unsaltPut.add(unsaltKV);
+ try {
+ sysTable.put(unsaltPut);
+ success = true;
+ } finally {
+ if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ }
+ } else { // We're screwed b/c we've already committed some salted sequences...
+ logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
}
- } else { // We're screwed b/c we've already committed some salted sequences...
- logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
}
}
}
- }
- } catch (IOException e) {
- throw ServerUtil.parseServerException(e);
- } finally {
- try {
- seqTable.close();
} catch (IOException e) {
- logger.warn("Exception during close",e);
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ seqTable.close();
+ } catch (IOException e) {
+ logger.warn("Exception during close",e);
+ }
}
}
+ return false;
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
@@ -351,6 +355,7 @@ public class UpgradeUtil {
logger.warn("Exception during close",e);
}
}
+
}
@SuppressWarnings("deprecation")