You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:40:51 UTC

[16/50] [abbrv] phoenix git commit: PHOENIX-1641 Make the upgrade from 4.x to 4.3 work for SYSTEM.CATALOG and SYSTEM.SEQUENCE

PHOENIX-1641 Make the upgrade from 4.x to 4.3 work for  SYSTEM.CATALOG and SYSTEM.SEQUENCE


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eaa7fbfd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eaa7fbfd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eaa7fbfd

Branch: refs/heads/calcite
Commit: eaa7fbfde9f45c635cc11d524977d54485c9ea8d
Parents: 9db37bd
Author: Samarth <sa...@salesforce.com>
Authored: Fri Feb 6 16:29:09 2015 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Feb 6 16:29:09 2015 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      |  69 +++++--
 .../org/apache/phoenix/util/UpgradeUtil.java    | 179 ++++++++++---------
 2 files changed, 146 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6d58f57..7763a0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -136,6 +136,7 @@ import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -1757,8 +1758,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     }
 
-    // Keeping this to use for further upgrades
-    protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
+    /** 
+     * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
+     */
+    private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
         String tableName, long timestamp, String columns) throws SQLException {
 
         Properties props = new Properties(oldMetaConnection.getClientInfo());
@@ -1826,7 +1829,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             } catch (NewerTableAlreadyExistsException ignore) {
                                 // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
                                 // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
-                            } catch (TableAlreadyExistsException ignore) {
+                            } catch (TableAlreadyExistsException e) {
+                                // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
+                                // any new columns we've added.
+                                long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+                                
+                                // We know that we always need to add the STORE_NULLS column for 4.3 release
+                                String columnsToAdd = PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                
+                                // If the server side schema is 4 versions behind then we need to add INDEX_TYPE
+                                // and INDEX_DISABLE_TIMESTAMP columns too.
+                                // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
+                                // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
+                                // the column names that have been added to SYSTEM.CATALOG since 4.0.
+                                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 4) {
+                                    columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
+                                            + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName();
+                                }
+                                
+                                // Ugh..need to assign to another local variable to keep eclipse happy.
+                                PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
+                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+                                metaConnection = newMetaConnection;
                             }
                             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                                     QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
@@ -1840,20 +1865,34 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 Integer sequenceSaltBuckets = e.getTable().getBucketNum();
                                 nSequenceSaltBuckets = sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets;
                             } catch (TableAlreadyExistsException e) {
-                                // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
+                                // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include
                                 // any new columns we've added.
-                                if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
-                                    metaConnection.removeTable(null,
-                                            PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
-                                            PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
-                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
-                                    clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
-                                            PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
-                                            PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
-                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
-                                    clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+                                long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
+                                // if the table is at a timestamp corresponding to before 4.2.1 then run the upgrade script
+                                if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 2) {
+                                    if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
+                                        metaConnection.removeTable(null,
+                                                PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
+                                                PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
+                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                                        clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+                                                PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
+                                                PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
+                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                                        clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+                                    }
+                                    nSequenceSaltBuckets = nSaltBuckets;
+                                } 
+                                if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) {
+                                    // If the table time stamp is before 4.1.0 then we need to add below columns
+                                    // to the SYSTEM.SEQUENCE table.
+                                    String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() 
+                                            + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
+                                            + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName()
+                                            + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                    addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
                                 }
-                                nSequenceSaltBuckets = nSaltBuckets;
                             }
                             try {
                                 metaConnection.createStatement().executeUpdate(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index a3fee72..a92223b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -242,106 +242,110 @@ public class UpgradeUtil {
                 logger.info("SYSTEM.SEQUENCE table has already been upgraded");
                 return false;
             }
+            
+            // if the SYSTEM.SEQUENCE table is for 4.1.0 or before then we need to salt the table
+            if (oldTable.getTimeStamp() <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) {
+                int batchSizeBytes = 100 * 1024; // 100K chunks
+                int sizeBytes = 0;
+                List<Mutation> mutations =  Lists.newArrayListWithExpectedSize(10000);
 
-            int batchSizeBytes = 100 * 1024; // 100K chunks
-            int sizeBytes = 0;
-            List<Mutation> mutations =  Lists.newArrayListWithExpectedSize(10000);
-    
-            boolean success = false;
-            Scan scan = new Scan();
-            scan.setRaw(true);
-            scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
-            HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
-            try {
-                boolean committed = false;
-                logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
-                ResultScanner scanner = seqTable.getScanner(scan);
+                boolean success = false;
+                Scan scan = new Scan();
+                scan.setRaw(true);
+                scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+                HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
                 try {
-                    Result result;
-                     while ((result = scanner.next()) != null) {
-                        for (KeyValue keyValue : result.raw()) {
-                            KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets);
-                            if (newKeyValue != null) {
-                                sizeBytes += newKeyValue.getLength();
-                                if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
-                                    // Delete old value
-                                    byte[] buf = keyValue.getBuffer();
-                                    Delete delete = new Delete(keyValue.getRow());
-                                    KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
-                                            buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
-                                            buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
-                                            keyValue.getTimestamp(), KeyValue.Type.Delete,
-                                            ByteUtil.EMPTY_BYTE_ARRAY,0,0);
-                                    delete.addDeleteMarker(deleteKeyValue);
-                                    mutations.add(delete);
-                                    sizeBytes += deleteKeyValue.getLength();
-                                    // Put new value
-                                    Put put = new Put(newKeyValue.getRow());
-                                    put.add(newKeyValue);
-                                    mutations.add(put);
-                                } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
-                                    // Copy delete marker using new key so that it continues
-                                    // to delete the key value preceding it that will be updated
-                                    // as well.
-                                    Delete delete = new Delete(newKeyValue.getRow());
-                                    delete.addDeleteMarker(newKeyValue);
-                                    mutations.add(delete);
+                    boolean committed = false;
+                    logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+                    ResultScanner scanner = seqTable.getScanner(scan);
+                    try {
+                        Result result;
+                        while ((result = scanner.next()) != null) {
+                            for (KeyValue keyValue : result.raw()) {
+                                KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets);
+                                if (newKeyValue != null) {
+                                    sizeBytes += newKeyValue.getLength();
+                                    if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
+                                        // Delete old value
+                                        byte[] buf = keyValue.getBuffer();
+                                        Delete delete = new Delete(keyValue.getRow());
+                                        KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
+                                                buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+                                                buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+                                                keyValue.getTimestamp(), KeyValue.Type.Delete,
+                                                ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+                                        delete.addDeleteMarker(deleteKeyValue);
+                                        mutations.add(delete);
+                                        sizeBytes += deleteKeyValue.getLength();
+                                        // Put new value
+                                        Put put = new Put(newKeyValue.getRow());
+                                        put.add(newKeyValue);
+                                        mutations.add(put);
+                                    } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+                                        // Copy delete marker using new key so that it continues
+                                        // to delete the key value preceding it that will be updated
+                                        // as well.
+                                        Delete delete = new Delete(newKeyValue.getRow());
+                                        delete.addDeleteMarker(newKeyValue);
+                                        mutations.add(delete);
+                                    }
+                                }
+                                if (sizeBytes >= batchSizeBytes) {
+                                    logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
+                                    seqTable.batch(mutations);
+                                    mutations.clear();
+                                    sizeBytes = 0;
+                                    committed = true;
                                 }
-                            }
-                            if (sizeBytes >= batchSizeBytes) {
-                                logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
-                                seqTable.batch(mutations);
-                                mutations.clear();
-                                sizeBytes = 0;
-                                committed = true;
                             }
                         }
-                    }
-                    if (!mutations.isEmpty()) {
-                        logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
-                        seqTable.batch(mutations);
-                    }
-                    preSplitSequenceTable(conn, nSaltBuckets);
-                    logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
-                    success = true;
-                    return true;
-                } catch (InterruptedException e) {
-                    throw ServerUtil.parseServerException(e);
-                } finally {
-                    try {
-                        scanner.close();
+                        if (!mutations.isEmpty()) {
+                            logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
+                            seqTable.batch(mutations);
+                        }
+                        preSplitSequenceTable(conn, nSaltBuckets);
+                        logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
+                        success = true;
+                        return true;
+                    } catch (InterruptedException e) {
+                        throw ServerUtil.parseServerException(e);
                     } finally {
-                        if (!success) {
-                            if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
-                                // Don't use Delete here as we'd never be able to change it again at this timestamp.
-                                KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey, 
-                                        PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
-                                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
-                                        PInteger.INSTANCE.toBytes(0));
-                                Put unsaltPut = new Put(seqTableKey);
-                                unsaltPut.add(unsaltKV);
-                                try {
-                                    sysTable.put(unsaltPut);
-                                    success = true;
-                                } finally {
-                                    if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+                        try {
+                            scanner.close();
+                        } finally {
+                            if (!success) {
+                                if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
+                                    // Don't use Delete here as we'd never be able to change it again at this timestamp.
+                                    KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey, 
+                                            PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                            PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                                            PInteger.INSTANCE.toBytes(0));
+                                    Put unsaltPut = new Put(seqTableKey);
+                                    unsaltPut.add(unsaltKV);
+                                    try {
+                                        sysTable.put(unsaltPut);
+                                        success = true;
+                                    } finally {
+                                        if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+                                    }
+                                } else { // We're screwed b/c we've already committed some salted sequences...
+                                    logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
                                 }
-                            } else { // We're screwed b/c we've already committed some salted sequences...
-                                logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
                             }
                         }
                     }
-                }
-            } catch (IOException e) {
-                throw ServerUtil.parseServerException(e);
-            } finally {
-                try {
-                    seqTable.close();
                 } catch (IOException e) {
-                    logger.warn("Exception during close",e);
+                    throw ServerUtil.parseServerException(e);
+                } finally {
+                    try {
+                        seqTable.close();
+                    } catch (IOException e) {
+                        logger.warn("Exception during close",e);
+                    }
                 }
             }
+            return false;
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         } finally {
@@ -351,6 +355,7 @@ public class UpgradeUtil {
                 logger.warn("Exception during close",e);
             }
         }
+        
     }
     
     @SuppressWarnings("deprecation")