You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/05 07:49:36 UTC
[3/3] git commit: PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split
as expected
PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split as expected
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb60cb4a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb60cb4a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb60cb4a
Branch: refs/heads/4.0
Commit: cb60cb4a3b0181df841fef10c0365e197d96b59d
Parents: 37c4d3e
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Nov 4 10:36:28 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Nov 4 22:49:07 2014 -0800
----------------------------------------------------------------------
.../phoenix/coprocessor/MetaDataProtocol.java | 4 +-
.../query/ConnectionQueryServicesImpl.java | 12 ++--
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../schema/TableAlreadyExistsException.java | 16 +++++-
.../org/apache/phoenix/util/UpgradeUtil.java | 58 ++++++++++++++++++--
5 files changed, 80 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 662bed3..f8349be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -53,14 +53,14 @@ import com.google.protobuf.HBaseZeroCopyByteString;
public abstract class MetaDataProtocol extends MetaDataService {
public static final int PHOENIX_MAJOR_VERSION = 4;
public static final int PHOENIX_MINOR_VERSION = 2;
- public static final int PHOENIX_PATCH_NUMBER = 0;
+ public static final int PHOENIX_PATCH_NUMBER = 1;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
public static final long MIN_TABLE_TIMESTAMP = 0;
// Incremented from 3 to 4 to salt the sequence table in 3.2/4.2
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 4;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 5;
public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
// TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index aeb9ac2..eab263b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1539,12 +1539,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (TableAlreadyExistsException e) {
// This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
// any new columns we've added.
- if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets)) {
+ if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
metaConnection.removeTable(null,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
- PhoenixDatabaseMetaData.TYPE_SEQUENCE,
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
+ PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearCache();
+ clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
+ PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
}
nSequenceSaltBuckets = nSaltBuckets;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5892d14..e5951fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1532,7 +1532,7 @@ public class MetaDataClient {
case TABLE_ALREADY_EXISTS:
connection.addTable(result.getTable());
if (!statement.ifNotExists()) {
- throw new TableAlreadyExistsException(schemaName, tableName);
+ throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
}
return null;
case PARENT_TABLE_NOT_FOUND:
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
index 466b4a4..2b4eaeb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
@@ -35,16 +35,26 @@ public class TableAlreadyExistsException extends SQLException {
private static SQLExceptionCode code = SQLExceptionCode.TABLE_ALREADY_EXIST;
private final String schemaName;
private final String tableName;
+ private final PTable table;
public TableAlreadyExistsException(String schemaName, String tableName) {
- this(schemaName, tableName, null);
+ this(schemaName, tableName, null, null);
}
public TableAlreadyExistsException(String schemaName, String tableName, String msg) {
+ this(schemaName, tableName, msg, null);
+ }
+
+ public TableAlreadyExistsException(String schemaName, String tableName, PTable table) {
+ this(schemaName, tableName, null, table);
+ }
+
+ public TableAlreadyExistsException(String schemaName, String tableName, String msg, PTable table) {
super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).setMessage(msg).build().toString(),
code.getSQLState(), code.getErrorCode());
this.schemaName = schemaName;
this.tableName = tableName;
+ this.table = table;
}
public String getTableName() {
@@ -54,4 +64,8 @@ public class TableAlreadyExistsException extends SQLException {
public String getSchemaName() {
return schemaName;
}
+
+ public PTable getTable() {
+ return table;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index b51b455..21e0631 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -37,6 +38,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SaltingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +52,36 @@ public class UpgradeUtil {
private UpgradeUtil() {
}
+ private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
+ HBaseAdmin admin = conn.getQueryServices().getAdmin();
+ try {
+ if (nSaltBuckets <= 0) {
+ return;
+ }
+ logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways");
+ for (int i = 0; i < nSaltBuckets; i++) {
+ logger.info("Pre-splitting SYSTEM.SEQUENCE table for salt bucket " + i);
+ admin.split(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES, new byte[] {(byte)i});
+ }
+ logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table");
+ } catch (IOException e) {
+ throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
+ } catch (InterruptedException e) {
+ throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.warn("Exception while closing admin during pre-split", e);
+ }
+ }
+ }
+
@SuppressWarnings("deprecation")
- public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
+ public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException {
logger.info("Upgrading SYSTEM.SEQUENCE table");
- byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE);
+ byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
@@ -69,7 +96,29 @@ public class UpgradeUtil {
if (!sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, saltPut)) {
-
+ if (oldTable == null) { // Unexpected, but to be safe just run pre-split code
+ preSplitSequenceTable(conn, nSaltBuckets);
+ return true;
+ }
+ // We can detect upgrade from 4.2.0 -> 4.2.1 based on the timestamp of the table row
+ if (oldTable.getTimeStamp() == MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP-1) {
+ byte[] oldSeqNum = PDataType.LONG.toBytes(oldTable.getSequenceNumber());
+ KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PDataType.LONG.toBytes(oldTable.getSequenceNumber()+1));
+ Put seqNumPut = new Put(seqTableKey);
+ seqNumPut.add(seqNumKV);
+ // Increment TABLE_SEQ_NUM in checkAndPut as semaphore so that only single client
+ // pre-splits the sequence table.
+ if (sysTable.checkAndPut(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, oldSeqNum, seqNumPut)) {
+ preSplitSequenceTable(conn, nSaltBuckets);
+ return true;
+ }
+ }
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
@@ -85,7 +134,7 @@ public class UpgradeUtil {
HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
boolean committed = false;
- logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
ResultScanner scanner = seqTable.getScanner(scan);
try {
Result result;
@@ -132,6 +181,7 @@ public class UpgradeUtil {
logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
seqTable.batch(mutations);
}
+ preSplitSequenceTable(conn, nSaltBuckets);
logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
success = true;
return true;