You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/17 21:16:20 UTC
git commit: PHOENIX-1365 Make sequence salt buckets configurable
Repository: phoenix
Updated Branches:
refs/heads/master d6b71e896 -> f4a1087e4
PHOENIX-1365 Make sequence salt buckets configurable
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f4a1087e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f4a1087e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f4a1087e
Branch: refs/heads/master
Commit: f4a1087e43eded9f71ce9af9c41c47b9fc7c36a0
Parents: d6b71e8
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Oct 17 12:21:44 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 17 12:21:44 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/LocalIndexIT.java | 2 +-
.../phoenix/end2end/index/ViewIndexIT.java | 2 +-
.../apache/phoenix/compile/SequenceManager.java | 3 +-
.../coprocessor/MetaDataEndpointImpl.java | 3 +
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 15 +-
.../phoenix/query/ConnectionQueryServices.java | 1 +
.../query/ConnectionQueryServicesImpl.java | 36 ++--
.../query/ConnectionlessQueryServicesImpl.java | 14 +-
.../query/DelegateConnectionQueryServices.java | 5 +
.../apache/phoenix/query/QueryConstants.java | 5 +-
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 13 ++
.../apache/phoenix/schema/MetaDataClient.java | 3 +-
.../org/apache/phoenix/schema/Sequence.java | 7 +
.../org/apache/phoenix/schema/SequenceKey.java | 8 +-
.../org/apache/phoenix/util/MetaDataUtil.java | 9 +-
.../org/apache/phoenix/util/SchemaUtil.java | 4 +-
.../org/apache/phoenix/util/UpgradeUtil.java | 169 +++++++++++--------
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
.../phoenix/query/QueryServicesTestImpl.java | 9 +-
20 files changed, 202 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index ef3dc77..2478317 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -161,7 +161,7 @@ public class LocalIndexIT extends BaseIndexIT {
ResultSet rs = conn2.createStatement().executeQuery("SELECT "
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_NAME
- + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+ + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED);
assertFalse("View index sequences should be deleted.", rs.next());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
index 2503933..d4ceff4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
@@ -82,7 +82,7 @@ public class ViewIndexIT extends BaseIndexIT {
ResultSet rs = conn2.createStatement().executeQuery("SELECT "
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_NAME
- + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+ + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED);
assertFalse("View index sequences should be deleted.", rs.next());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 03091c4..9ea4245 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -122,7 +122,8 @@ public class SequenceManager {
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
- SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName());
+ int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets);
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 62cf8bf..ebe8a7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -544,6 +544,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
Integer saltBucketNum =
saltBucketNumKv != null ? (Integer) PDataType.INTEGER.getCodec().decodeInt(
saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
+ if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
+ saltBucketNum = null; // Zero salt buckets means not salted
+ }
Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
PName dataTableName =
dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 9dd41f3..7a1f2be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -100,12 +100,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
public static final int TENANT_ID_INDEX = 0;
public static final String SYSTEM_CATALOG_SCHEMA = QueryConstants.SYSTEM_SCHEMA_NAME;
+ public static final byte[] SYSTEM_CATALOG_SCHEMA_BYTES = QueryConstants.SYSTEM_SCHEMA_NAME_BYTES;
public static final String SYSTEM_CATALOG_TABLE = "CATALOG";
+ public static final byte[] SYSTEM_CATALOG_TABLE_BYTES = Bytes.toBytes(SYSTEM_CATALOG_TABLE);
public static final String SYSTEM_CATALOG = SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"";
- public static final byte[] SYSTEM_CATALOG_SCHEMA_BYTES = Bytes.toBytes(SYSTEM_CATALOG_TABLE);
- public static final byte[] SYSTEM_CATALOG_TABLE_BYTES = Bytes.toBytes(SYSTEM_CATALOG_SCHEMA);
public static final String SYSTEM_CATALOG_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE);
- public static final byte[] SYSTEM_CATALOG_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_CATALOG_SCHEMA_BYTES);
+ public static final byte[] SYSTEM_CATALOG_NAME_BYTES = Bytes.toBytes(SYSTEM_CATALOG_NAME);
public static final String SYSTEM_STATS_TABLE = "STATS";
public static final String SYSTEM_STATS_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_STATS_TABLE);
public static final byte[] SYSTEM_STATS_NAME_BYTES = Bytes.toBytes(SYSTEM_STATS_NAME);
@@ -200,8 +200,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
public static final String TYPE_SEQUENCE = "SEQUENCE";
public static final byte[] SEQUENCE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
- public static final String SEQUENCE_TABLE_NAME = SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"";
- public static final byte[] SEQUENCE_TABLE_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, TYPE_SEQUENCE);
+ public static final String SEQUENCE_SCHEMA_NAME = SYSTEM_CATALOG_SCHEMA;
+ public static final byte[] SEQUENCE_SCHEMA_NAME_BYTES = Bytes.toBytes(SEQUENCE_SCHEMA_NAME);
+ public static final String SEQUENCE_TABLE_NAME = TYPE_SEQUENCE;
+ public static final byte[] SEQUENCE_TABLE_NAME_BYTES = Bytes.toBytes(SEQUENCE_TABLE_NAME);
+ public static final String SEQUENCE_FULLNAME_ESCAPED = SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"";
+ public static final String SEQUENCE_FULLNAME = SchemaUtil.getTableName(SEQUENCE_SCHEMA_NAME, SEQUENCE_TABLE_NAME);
+ public static final byte[] SEQUENCE_FULLNAME_BYTES = Bytes.toBytes(SEQUENCE_FULLNAME);
public static final String SEQUENCE_SCHEMA = "SEQUENCE_SCHEMA";
public static final String SEQUENCE_NAME = "SEQUENCE_NAME";
public static final String CURRENT_VALUE = "CURRENT_VALUE";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 9d2e194..c017b77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -110,4 +110,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public void addTableStats(String physicalName, PTableStats tableStats);
public void clearCache() throws SQLException;
+ public int getSequenceSaltBuckets();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index be4360e..2be059c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -176,6 +176,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private HConnection connection;
private volatile boolean initialized;
+ private volatile int nSequenceSaltBuckets;
// writes guarded by "this"
private volatile boolean closed;
@@ -1521,22 +1522,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
} catch (TableAlreadyExistsException ignore) {
}
+ int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+ QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
try {
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
+ String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
+ metaConnection.createStatement().executeUpdate(createSequenceTable);
+ nSequenceSaltBuckets = nSaltBuckets;
+ } catch (NewerTableAlreadyExistsException e) {
// Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
-
- } catch (TableAlreadyExistsException ignore) {
+ PTable sequenceTable = ConnectionQueryServicesImpl.this.latestMetaData.getTable(new PTableKey(null, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME));
+ Integer sequenceSaltBuckets = sequenceTable.getBucketNum();
+ nSequenceSaltBuckets = sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets;
+ } catch (TableAlreadyExistsException e) {
// This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
// any new columns we've added.
- if (UpgradeUtil.addSaltByteToSequenceTable(metaConnection)) {
+ if (UpgradeUtil.addSaltByteToSequenceTable(metaConnection, nSaltBuckets)) {
metaConnection.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.TYPE_SEQUENCE,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
clearCache();
}
+ nSequenceSaltBuckets = nSaltBuckets;
}
try {
metaConnection.createStatement().executeUpdate(
@@ -1700,7 +1708,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public long createSequence(String tenantId, String schemaName, String sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue, long maxValue,
boolean cycle, long timestamp) throws SQLException {
- SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
@@ -1711,7 +1719,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Now that we have the lock we need, create the sequence
Append append = sequence.createSequence(startWith, incrementBy, cacheSize, timestamp, minValue, maxValue, cycle);
HTableInterface htable =
- this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
Result result = htable.append(append);
return sequence.createSequence(result, minValue, maxValue, cycle);
@@ -1727,7 +1735,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException {
- SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
@@ -1737,7 +1745,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
sequence.getLock().lock();
// Now that we have the lock we need, create the sequence
Append append = sequence.dropSequence(timestamp);
- HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
Result result = htable.append(append);
return sequence.dropSequence(result);
@@ -1836,7 +1844,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (toIncrementList.isEmpty()) {
return;
}
- HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
Object[] resultObjects = null;
SQLException sqlE = null;
try {
@@ -1955,7 +1963,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (toReturnList.isEmpty()) {
return;
}
- HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
Object[] resultObjects = null;
SQLException sqlE = null;
try {
@@ -2006,7 +2014,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (mutations.isEmpty()) {
return;
}
- HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
SQLException sqlE = null;
try {
hTable.batch(mutations);
@@ -2098,4 +2106,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void addTableStats(String physicalName, PTableStats tableStats) {
tableStatsCache.put(physicalName, tableStats);
}
+ @Override
+ public int getSequenceSaltBuckets() {
+ return nSequenceSaltBuckets;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4700d44..9bd30a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -225,7 +225,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
}
try {
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+ int nSaltBuckets = getSequenceSaltBuckets();
+ String createTableStatement = Sequence.getCreateTableStatement(nSaltBuckets);
+ metaConnection.createStatement().executeUpdate(createTableStatement);
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
@@ -317,7 +319,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public long createSequence(String tenantId, String schemaName, String sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue, long maxValue,
boolean cycle, long timestamp) throws SQLException {
- SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName, getSequenceSaltBuckets());
if (sequenceMap.get(key) != null) {
throw new SequenceAlreadyExistsException(schemaName, sequenceName);
}
@@ -327,7 +329,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException {
- SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName, getSequenceSaltBuckets());
if (sequenceMap.remove(key) == null) {
throw new SequenceNotFoundException(schemaName, sequenceName);
}
@@ -436,4 +438,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public void clearCache() throws SQLException {
}
+
+ @Override
+ public int getSequenceSaltBuckets() {
+ return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+ QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 2bcacc6..34bca4d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -248,4 +248,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public void clearCache() throws SQLException {
getDelegate().clearCache();
}
+
+ @Override
+ public int getSequenceSaltBuckets() {
+ return getDelegate().getSequenceSaltBuckets();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 1fd7b15..f6b1cfa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -99,7 +99,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.util.ByteUtil;
@@ -267,7 +266,5 @@ public interface QueryConstants {
CYCLE_FLAG + " BOOLEAN, \n" +
LIMIT_REACHED_FLAG + " BOOLEAN \n" +
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
- HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
- "SALT_BUCKETS=" + SaltingUtil.MAX_BUCKET_NUM + "\n";
-
+ HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0734f19..7f000c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -137,6 +137,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
+ public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index edcb597..e890cd7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -47,6 +47,7 @@ import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
@@ -60,6 +61,7 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -148,6 +150,11 @@ public class QueryServicesOptions {
public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20;
public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
+
+ /**
+ * Use only first time SYSTEM.SEQUENCE table is created.
+ */
+ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = SaltingUtil.MAX_BUCKET_NUM;
private final Configuration config;
@@ -443,4 +450,10 @@ public class QueryServicesOptions {
public QueryServicesOptions setMinStatsUpdateFrequencyMs(int frequencyMs) {
return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
}
+
+ public QueryServicesOptions setSequenceSaltBuckets(int saltBuckets) {
+ config.setInt(SEQUENCE_SALT_BUCKETS_ATTRIB, saltBuckets);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 3c02456..92fca0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -886,7 +886,8 @@ public class MetaDataClient {
PName tenantId = connection.getTenantId();
String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
PName physicalName = dataTable.getPhysicalName();
- SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName);
+ int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets);
// Create at parent timestamp as we know that will be earlier than now
// and earlier than any SCN if one is set.
createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index 08af961..21445e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -529,4 +529,11 @@ public class Sequence {
.setTableName(key.getSequenceName())
.build().buildException();
}
+
+ public static String getCreateTableStatement(int nSaltBuckets) {
+ if (nSaltBuckets <= 0) {
+ return QueryConstants.CREATE_SEQUENCE_METADATA;
+ }
+ return QueryConstants.CREATE_SEQUENCE_METADATA + "," + PhoenixDatabaseMetaData.SALT_BUCKETS + "=" + nSaltBuckets;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
index c25e438..94ca549 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
@@ -28,12 +28,14 @@ public class SequenceKey implements Comparable<SequenceKey> {
private final String sequenceName;
private final byte[] key;
- public SequenceKey(String tenantId, String schemaName, String sequenceName) {
+ public SequenceKey(String tenantId, String schemaName, String sequenceName, int nBuckets) {
this.tenantId = tenantId;
this.schemaName = schemaName;
this.sequenceName = sequenceName;
- this.key = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId), QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
- key[0] = SaltingUtil.getSaltingByte(key, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES, SaltingUtil.MAX_BUCKET_NUM);
+ this.key = ByteUtil.concat(nBuckets <= 0 ? ByteUtil.EMPTY_BYTE_ARRAY : QueryConstants.SEPARATOR_BYTE_ARRAY, tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId), QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
+ if (nBuckets > 0) {
+ key[0] = SaltingUtil.getSaltingByte(key, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES, SaltingUtil.MAX_BUCKET_NUM);
+ }
}
public byte[] getKey() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 464e87d..7325161 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -291,14 +291,14 @@ public class MetaDataUtil {
return SchemaUtil.getTableName(schemaName, tableName);
}
- public static SequenceKey getViewIndexSequenceKey(String tenantId, PName physicalName) {
+ public static SequenceKey getViewIndexSequenceKey(String tenantId, PName physicalName, int nSaltBuckets) {
// Create global sequence of the form: <prefixed base table name><tenant id>
// rather than tenant-specific sequence, as it makes it much easier
// to cleanup when the physical table is dropped, as we can delete
// all global sequences leading with <prefix> + physical name.
String schemaName = VIEW_INDEX_SEQUENCE_PREFIX + physicalName.getString();
String tableName = tenantId == null ? "" : tenantId;
- return new SequenceKey(null, schemaName, tableName);
+ return new SequenceKey(null, schemaName, tableName, nSaltBuckets);
}
public static PDataType getViewIndexIdDataType() {
@@ -346,8 +346,9 @@ public class MetaDataUtil {
}
public static void deleteViewIndexSequences(PhoenixConnection connection, PName name) throws SQLException {
- SequenceKey key = getViewIndexSequenceKey(null, name);
- connection.createStatement().executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME +
+ int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = getViewIndexSequenceKey(null, name, nSequenceSaltBuckets);
+ connection.createStatement().executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED +
" WHERE " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL AND " +
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + key.getSchemaName() + "'");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 30c328b..309b4be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -342,7 +342,7 @@ public class SchemaUtil {
}
public static boolean isSequenceTable(byte[] tableName) {
- return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0;
+ return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES) == 0;
}
public static boolean isMetaTable(PTable table) {
@@ -350,7 +350,7 @@ public class SchemaUtil {
}
public static boolean isMetaTable(byte[] schemaName, byte[] tableName) {
- return Bytes.compareTo(schemaName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES) == 0 && Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES) == 0;
+ return Bytes.compareTo(schemaName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES) == 0 && Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES) == 0;
}
public static boolean isMetaTable(String schemaName, String tableName) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 4c8a369..3054200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -45,105 +45,134 @@ public class UpgradeUtil {
private UpgradeUtil() {
}
- public static boolean addSaltByteToSequenceTable(PhoenixConnection conn) throws SQLException {
+ public static boolean addSaltByteToSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
+ if (nSaltBuckets <= 0) {
+ logger.info("Not upgrading SYSTEM.SEQUENCE table because SALT_BUCKETS is zero");
+ return false;
+ }
logger.info("Upgrading SYSTEM.SEQUENCE table");
+ byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE);
HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
- byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE);
logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
- PDataType.INTEGER.toBytes(SaltingUtil.MAX_BUCKET_NUM));
- Put put = new Put(seqTableKey);
- put.add(saltKV);
+ PDataType.INTEGER.toBytes(nSaltBuckets));
+ Put saltPut = new Put(seqTableKey);
+ saltPut.add(saltKV);
// Prevent multiple clients from doing this upgrade
if (!sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, put)) {
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, saltPut)) {
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
- } catch (IOException e) {
- throw ServerUtil.parseServerException(e);
- } finally {
- try {
- sysTable.close();
- } catch (IOException e) {
- logger.warn("Exception during close",e);
- }
- }
- int batchSizeBytes = 100 * 1024; // 100K chunks
- int sizeBytes = 0;
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
- boolean success = false;
- Scan scan = new Scan();
- scan.setRaw(true);
- scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
- HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
- try {
- logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
- ResultScanner scanner = seqTable.getScanner(scan);
+ int batchSizeBytes = 100 * 1024; // 100K chunks
+ int sizeBytes = 0;
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
+
+ boolean success = false;
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+ HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
- Result result;
- while ((result = scanner.next()) != null) {
- for (KeyValue keyValue : result.raw()) {
- KeyValue newKeyValue = addSaltByte(keyValue);
- sizeBytes += newKeyValue.getLength();
- if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
- // Delete old value
- byte[] buf = keyValue.getBuffer();
- Delete delete = new Delete(keyValue.getRow());
- KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
- buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
- buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
- keyValue.getTimestamp(), KeyValue.Type.Delete,
- ByteUtil.EMPTY_BYTE_ARRAY,0,0);
- delete.addDeleteMarker(deleteKeyValue);
- mutations.add(delete);
- sizeBytes += deleteKeyValue.getLength();
- // Put new value
- Put put = new Put(newKeyValue.getRow());
- put.add(newKeyValue);
- mutations.add(put);
- } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
- // Copy delete marker using new key so that it continues
- // to delete the key value preceding it that will be updated
- // as well.
- Delete delete = new Delete(newKeyValue.getRow());
- delete.addDeleteMarker(newKeyValue);
- mutations.add(delete);
+ boolean committed = false;
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ ResultScanner scanner = seqTable.getScanner(scan);
+ try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ for (KeyValue keyValue : result.raw()) {
+ KeyValue newKeyValue = addSaltByte(keyValue);
+ sizeBytes += newKeyValue.getLength();
+ if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
+ // Delete old value
+ byte[] buf = keyValue.getBuffer();
+ Delete delete = new Delete(keyValue.getRow());
+ KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
+ buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+ buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+ keyValue.getTimestamp(), KeyValue.Type.Delete,
+ ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+ delete.addDeleteMarker(deleteKeyValue);
+ mutations.add(delete);
+ sizeBytes += deleteKeyValue.getLength();
+ // Put new value
+ Put put = new Put(newKeyValue.getRow());
+ put.add(newKeyValue);
+ mutations.add(put);
+ } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+ // Copy delete marker using new key so that it continues
+ // to delete the key value preceding it that will be updated
+ // as well.
+ Delete delete = new Delete(newKeyValue.getRow());
+ delete.addDeleteMarker(newKeyValue);
+ mutations.add(delete);
+ }
+ if (sizeBytes >= batchSizeBytes) {
+ logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ mutations.clear();
+ sizeBytes = 0;
+ committed = true;
+ }
}
- if (sizeBytes >= batchSizeBytes) {
- logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
- seqTable.batch(mutations);
- mutations.clear();
- sizeBytes = 0;
+ }
+ if (!mutations.isEmpty()) {
+ logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ }
+ logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
+ success = true;
+ return true;
+ } catch (InterruptedException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ scanner.close();
+ } finally {
+ if (!success) {
+ if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
+ // Don't use Delete here as we'd never be able to change it again at this timestamp.
+ KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PDataType.INTEGER.toBytes(0));
+ Put unsaltPut = new Put(seqTableKey);
+ unsaltPut.add(unsaltKV);
+ try {
+ sysTable.put(unsaltPut);
+ success = true;
+ } finally {
+ if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ }
+ } else { // We're screwed b/c we've already committed some salted sequences...
+ logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ }
}
}
}
- if (!mutations.isEmpty()) {
- logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
- seqTable.batch(mutations);
- }
- logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
- success = true;
- return true;
- } catch (InterruptedException e) {
+ } catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
- if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
- scanner.close();
+ try {
+ seqTable.close();
+ } catch (IOException e) {
+ logger.warn("Exception during close",e);
+ }
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
- seqTable.close();
+ sysTable.close();
} catch (IOException e) {
logger.warn("Exception during close",e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b1f90a3..a58bfef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -812,7 +812,7 @@ public abstract class BaseTest {
ResultSet rs = conn.createStatement().executeQuery("SELECT "
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_NAME
- + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+ + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED);
while (rs.next()) {
try {
conn.createStatement().execute("DROP SEQUENCE " + SchemaUtil.getTableName(rs.getString(1), rs.getString(2)));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4a1087e/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 0dcef11..2af2666 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -33,7 +33,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
private static final int DEFAULT_THREAD_POOL_SIZE = 20;
- private static final int DEFAULT_QUEUE_SIZE = 1000;
+ private static final int DEFAULT_QUEUE_SIZE = 0;
// TODO: setting this down to 5mb causes insufficient memory exceptions. Need to investigate why
private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap
private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000*5; //5min
@@ -52,6 +52,12 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb
public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb
public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0;
+
+ /**
+ * Set number of salt buckets lower for sequence table during testing, as a high
+ * value overwhelms our mini clusters.
+ */
+ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 4;
public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
@@ -60,6 +66,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
private static QueryServicesOptions getDefaultServicesOptions() {
return withDefaults()
+ .setSequenceSaltBuckets(DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS)
.setMinStatsUpdateFrequencyMs(DEFAULT_MIN_STATS_UPDATE_FREQ_MS)
.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
.setQueueSize(DEFAULT_QUEUE_SIZE)