You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/05 07:49:36 UTC

[3/3] git commit: PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split as expected

PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split as expected


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb60cb4a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb60cb4a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb60cb4a

Branch: refs/heads/4.0
Commit: cb60cb4a3b0181df841fef10c0365e197d96b59d
Parents: 37c4d3e
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Nov 4 10:36:28 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Nov 4 22:49:07 2014 -0800

----------------------------------------------------------------------
 .../phoenix/coprocessor/MetaDataProtocol.java   |  4 +-
 .../query/ConnectionQueryServicesImpl.java      | 12 ++--
 .../apache/phoenix/schema/MetaDataClient.java   |  2 +-
 .../schema/TableAlreadyExistsException.java     | 16 +++++-
 .../org/apache/phoenix/util/UpgradeUtil.java    | 58 ++++++++++++++++++--
 5 files changed, 80 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 662bed3..f8349be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -53,14 +53,14 @@ import com.google.protobuf.HBaseZeroCopyByteString;
 public abstract class MetaDataProtocol extends MetaDataService {
     public static final int PHOENIX_MAJOR_VERSION = 4;
     public static final int PHOENIX_MINOR_VERSION = 2;
-    public static final int PHOENIX_PATCH_NUMBER = 0;
+    public static final int PHOENIX_PATCH_NUMBER = 1;
     public static final int PHOENIX_VERSION = 
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
     
     public static final long MIN_TABLE_TIMESTAMP = 0;
 
     // Incremented from 3 to 4 to salt the sequence table in 3.2/4.2
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 4;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 5;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
 
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index aeb9ac2..eab263b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1539,12 +1539,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             } catch (TableAlreadyExistsException e) {
                                 // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
                                 // any new columns we've added.
-                                if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets)) {
+                                if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
                                     metaConnection.removeTable(null,
-                                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                                            PhoenixDatabaseMetaData.TYPE_SEQUENCE,
+                                            PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
+                                            PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
                                             MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
-                                    clearCache();
+                                    clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, 
+                                            PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
+                                            PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
+                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                                    clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
                                 }
                                 nSequenceSaltBuckets = nSaltBuckets;
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5892d14..e5951fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1532,7 +1532,7 @@ public class MetaDataClient {
             case TABLE_ALREADY_EXISTS:
                 connection.addTable(result.getTable());
                 if (!statement.ifNotExists()) {
-                    throw new TableAlreadyExistsException(schemaName, tableName);
+                    throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
                 }
                 return null;
             case PARENT_TABLE_NOT_FOUND:

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
index 466b4a4..2b4eaeb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
@@ -35,16 +35,26 @@ public class TableAlreadyExistsException extends SQLException {
     private static SQLExceptionCode code = SQLExceptionCode.TABLE_ALREADY_EXIST;
     private final String schemaName;
     private final String tableName;
+    private final PTable table;
 
     public TableAlreadyExistsException(String schemaName, String tableName) {
-        this(schemaName, tableName, null);
+        this(schemaName, tableName, null, null);
     }
 
     public TableAlreadyExistsException(String schemaName, String tableName, String msg) {
+        this(schemaName, tableName, msg, null);
+    }
+
+    public TableAlreadyExistsException(String schemaName, String tableName, PTable table) {
+        this(schemaName, tableName, null, table);
+    }
+
+    public TableAlreadyExistsException(String schemaName, String tableName, String msg, PTable table) {
         super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).setMessage(msg).build().toString(),
                 code.getSQLState(), code.getErrorCode());
         this.schemaName = schemaName;
         this.tableName = tableName;
+        this.table = table;
     }
 
     public String getTableName() {
@@ -54,4 +64,8 @@ public class TableAlreadyExistsException extends SQLException {
     public String getSchemaName() {
         return schemaName;
     }
+    
+    public PTable getTable() {
+        return table;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index b51b455..21e0631 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -37,6 +38,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,11 +52,36 @@ public class UpgradeUtil {
     private UpgradeUtil() {
     }
 
+    private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
+        HBaseAdmin admin = conn.getQueryServices().getAdmin();
+        try {
+            if (nSaltBuckets <= 0) {
+                return;
+            }
+            logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways");
+            for (int i = 0; i < nSaltBuckets; i++) {
+                logger.info("Pre-splitting SYSTEM.SEQUENCE table for salt bucket " + i);
+                admin.split(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES, new byte[] {(byte)i});
+            }
+            logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table");
+        } catch (IOException e) {
+            throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
+        } catch (InterruptedException e) {
+            throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
+        } finally {
+            try {
+                admin.close();
+            } catch (IOException e) {
+                logger.warn("Exception while closing admin during pre-split", e);
+            }
+        }
+    }
+    
     @SuppressWarnings("deprecation")
-    public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
+    public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException {
         logger.info("Upgrading SYSTEM.SEQUENCE table");
 
-        byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE);
+        byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
         HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
         try {
             logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
@@ -69,7 +96,29 @@ public class UpgradeUtil {
             if (!sysTable.checkAndPut(seqTableKey,
                     PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, saltPut)) {
-
+                if (oldTable == null) { // Unexpected, but to be safe just run pre-split code
+                    preSplitSequenceTable(conn, nSaltBuckets);
+                    return true;
+                }
+                // We can detect upgrade from 4.2.0 -> 4.2.1 based on the timestamp of the table row
+                if (oldTable.getTimeStamp() == MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP-1) {
+                    byte[] oldSeqNum = PDataType.LONG.toBytes(oldTable.getSequenceNumber());
+                    KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey, 
+                            PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                            PDataType.LONG.toBytes(oldTable.getSequenceNumber()+1));
+                    Put seqNumPut = new Put(seqTableKey);
+                    seqNumPut.add(seqNumKV);
+                    // Increment TABLE_SEQ_NUM in checkAndPut as semaphore so that only single client
+                    // pre-splits the sequence table.
+                    if (sysTable.checkAndPut(seqTableKey,
+                            PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, oldSeqNum, seqNumPut)) {
+                        preSplitSequenceTable(conn, nSaltBuckets);
+                        return true;
+                    }
+                }
                 logger.info("SYSTEM.SEQUENCE table has already been upgraded");
                 return false;
             }
@@ -85,7 +134,7 @@ public class UpgradeUtil {
             HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
             try {
                 boolean committed = false;
-               logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+                logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
                 ResultScanner scanner = seqTable.getScanner(scan);
                 try {
                     Result result;
@@ -132,6 +181,7 @@ public class UpgradeUtil {
                         logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
                         seqTable.batch(mutations);
                     }
+                    preSplitSequenceTable(conn, nSaltBuckets);
                     logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
                     success = true;
                     return true;