You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:41 UTC
[03/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation
transaction support through Tephra (James Taylor, Thomas D'Silva)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 18a97bd..2a08f45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -115,6 +116,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -163,9 +165,20 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
@@ -183,6 +196,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Max number of cached table stats for view or shared index physical tables
private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512;
protected final Configuration config;
+ private final ConnectionInfo connectionInfo;
// Copy of config.getProps(), but read-only to prevent synchronization that we
// don't need.
private final ReadOnlyProps props;
@@ -197,7 +211,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Lowest HBase version on the cluster.
private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
- private boolean hasInvalidIndexConfiguration = false;
+ private boolean isMutableIndexWALCodecInstalled = true;
@GuardedBy("connectionCountLock")
private int connectionCount = 0;
@@ -205,6 +219,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final boolean returnSequenceValues ;
private HConnection connection;
+ private TransactionServiceClient txServiceClient;
private volatile boolean initialized;
private volatile int nSequenceSaltBuckets;
@@ -257,6 +272,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (Entry<String,String> entry : connectionInfo.asProps()) {
config.set(entry.getKey(), entry.getValue());
}
+ this.connectionInfo = connectionInfo;
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
@@ -282,6 +298,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.returnSequenceValues = config.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
}
+ @Override
+ public TransactionSystemClient getTransactionSystemClient() {
+ return txServiceClient;
+ }
+
+ private void initTxServiceClient() {
+ String zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
+ ZKClientService zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkQuorumServersString)
+ .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+ .build(),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ )
+ )
+ );
+ zkClientService.startAndWait();
+ ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+ PooledClientProvider pooledClientProvider = new PooledClientProvider(
+ config, zkDiscoveryService);
+ this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
+ }
+
private void openConnection() throws SQLException {
try {
// check if we need to authenticate with kerberos
@@ -293,6 +333,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null);
logger.info("Successfull login to secure cluster!!");
}
+ initTxServiceClient();
this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
} catch (IOException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
@@ -308,9 +349,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, null);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
- byte[][] schemaAndTableName = new byte[2][];
- SchemaUtil.getVarChars(tableName, schemaAndTableName);
- throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1]));
+ throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
} catch (IOException e) {
throw new SQLException(e);
}
@@ -457,18 +496,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public PMetaData addTable(PTable table) throws SQLException {
+ public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
try {
throwConnectionClosedIfNullMetaData();
// If existing table isn't older than new table, don't replace
// If a client opens a connection at an earlier timestamp, this can happen
- PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
+ PTable existingTable = latestMetaData.getTableRef(new PTableKey(table.getTenantId(), table.getName().getString())).getTable();
if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
return latestMetaData;
}
} catch (TableNotFoundException e) {}
- latestMetaData = latestMetaData.addTable(table);
+ latestMetaData = latestMetaData.addTable(table, resolvedTime);
+ latestMetaDataLock.notifyAll();
+ return latestMetaData;
+ }
+ }
+
+ public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
+ latestMetaData = latestMetaData.updateResolvedTimestamp(table, resolvedTime);
latestMetaDataLock.notifyAll();
return latestMetaData;
}
@@ -490,7 +538,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
while (true) {
try {
try {
- table = metaData.getTable(new PTableKey(tenantId, tableName));
+ table = metaData.getTableRef(new PTableKey(tenantId, tableName)).getTable();
/* If the table is at the prior sequence number, then we're good to go.
* We know if we've got this far, that the server validated the mutations,
* so we'd just need to wait until the other connection that mutated the same
@@ -531,12 +579,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls) throws SQLException {
+ public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long resolvedTime) throws SQLException {
return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
@Override
public PMetaData mutate(PMetaData metaData) throws SQLException {
try {
- return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+ return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
} catch (TableNotFoundException e) {
// The DROP TABLE may have been processed first, so just ignore.
return metaData;
@@ -556,12 +604,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public PMetaData removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum) throws SQLException {
+ public PMetaData removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException {
return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
@Override
public PMetaData mutate(PMetaData metaData) throws SQLException {
try {
- return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+ return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
} catch (TableNotFoundException e) {
// The DROP TABLE may have been processed first, so just ignore.
return metaData;
@@ -594,25 +642,40 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (Entry<String,Object> entry : family.getSecond().entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
- columnDesc.setValue(key, value == null ? null : value.toString());
+ setHColumnDescriptorValue(columnDesc, key, value);
}
}
return columnDesc;
}
-
- private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Pair<byte[], Map<String,Object>> family) throws SQLException {
- if (Bytes.equals(hcd.getName(), family.getFirst())) {
- modifyColumnFamilyDescriptor(hcd, family.getSecond());
+
+ // Workaround HBASE-14737
+ private static void setHColumnDescriptorValue(HColumnDescriptor columnDesc, String key, Object value) {
+ if (HConstants.VERSIONS.equals(key)) {
+ columnDesc.setMaxVersions(getMaxVersion(value));
} else {
- throw new IllegalArgumentException("Column family names don't match. Column descriptor family name: " + hcd.getNameAsString() + ", Family name: " + Bytes.toString(family.getFirst()));
+ columnDesc.setValue(key, value == null ? null : value.toString());
}
}
+
+ private static int getMaxVersion(Object value) {
+ if (value == null) {
+ return -1; // HColumnDescriptor.UNINITIALIZED is private
+ }
+ if (value instanceof Number) {
+ return ((Number)value).intValue();
+ }
+ String stringValue = value.toString();
+ if (stringValue.isEmpty()) {
+ return -1;
+ }
+ return Integer.parseInt(stringValue);
+ }
private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Map<String,Object> props) throws SQLException {
for (Entry<String, Object> entry : props.entrySet()) {
String propName = entry.getKey();
Object value = entry.getValue();
- hcd.setValue(propName, value == null ? null : value.toString());
+ setHColumnDescriptorValue(hcd, propName, value);
}
}
@@ -622,8 +685,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new HTableDescriptor(TableName.valueOf(tableName));
for (Entry<String,Object> entry : tableProps.entrySet()) {
String key = entry.getKey();
- Object value = entry.getValue();
- tableDescriptor.setValue(key, value == null ? null : value.toString());
+ if (!TableProperty.isPhoenixTableProperty(key)) {
+ Object value = entry.getValue();
+ tableDescriptor.setValue(key, value == null ? null : value.toString());
+ }
}
if (families.isEmpty()) {
if (tableType != PTableType.VIEW) {
@@ -649,16 +714,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
tableDescriptor.addFamily(columnDescriptor);
} else {
if (tableType != PTableType.VIEW) {
- modifyColumnFamilyDescriptor(tableDescriptor.getFamily(familyByte), family);
+ HColumnDescriptor columnDescriptor = tableDescriptor.getFamily(familyByte);
+ if (columnDescriptor == null) {
+ throw new IllegalArgumentException("Unable to find column descriptor with family name " + Bytes.toString(family.getFirst()));
+ }
+ modifyColumnFamilyDescriptor(columnDescriptor, family.getSecond());
}
}
}
}
- addCoprocessors(tableName, tableDescriptor, tableType);
+ addCoprocessors(tableName, tableDescriptor, tableType, tableProps);
return tableDescriptor;
}
- private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException {
+ private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
@@ -674,17 +743,35 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
+ boolean isTransactional =
+ Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
+ Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
// all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
&& !SchemaUtil.isMetaTable(tableName)
- && !SchemaUtil.isStatsTable(tableName)
- && !descriptor.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ && !SchemaUtil.isStatsTable(tableName)) {
+ if (isTransactional) {
+ if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
+ }
+ // For alter table, remove non transactional index coprocessor
+ if (descriptor.hasCoprocessor(Indexer.class.getName())) {
+ descriptor.removeCoprocessor(Indexer.class.getName());
+ }
+ } else {
+ if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ // If exception on alter table to transition back to non transactional
+ if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
+ }
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
+ }
}
if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
@@ -722,6 +809,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
}
}
+
+ if (isTransactional) {
+ if (!descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+ }
+ } else {
+ // If exception on alter table to transition back to non transactional
+ if (descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ descriptor.removeCoprocessor(TransactionProcessor.class.getName());
+ }
+ }
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
@@ -879,8 +977,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
checkClientServerCompatibility();
}
- if (!modifyExistingMetaData || existingDesc.equals(newDesc)) {
- return existingDesc;
+ if (!modifyExistingMetaData) {
+ return existingDesc; // Caller already knows that no metadata was changed
+ }
+ boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
+ // If mapping an existing table as transactional, set property so that existing
+ // data is correctly read.
+ if (willBeTx) {
+ newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString());
+ } else {
+ // If we think we're creating a non transactional table when it's already
+ // transactional, don't allow.
+ if (existingDesc.hasCoprocessor(TransactionProcessor.class.getName())) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+ .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
+ .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
+ }
+ newDesc.remove(TxConstants.READ_NON_TX_DATA);
+ }
+ if (existingDesc.equals(newDesc)) {
+ return null; // Indicate that no metadata was changed
}
modifyTable(tableName, newDesc, true);
@@ -987,7 +1103,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
buf.append(name);
buf.append(';');
}
- hasInvalidIndexConfiguration |= isInvalidMutableIndexConfig(result.getValue());
+ isMutableIndexWALCodecInstalled &= !isInvalidMutableIndexConfig(result.getValue());
if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) {
minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue());
}
@@ -1096,7 +1212,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
- table = latestMetaData.getTable(new PTableKey(PName.EMPTY_NAME, parentTableName));
+ table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable();
latestMetaDataLock.notifyAll();
}
if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
@@ -1271,7 +1387,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
- instance.createTable(controller, builder.build(), rpcCallback);
+ CreateTableRequest build = builder.build();
+ instance.createTable(controller, build, rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
@@ -1460,7 +1577,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (metadata == null) {
throwConnectionClosedException();
}
- table = metadata.getTable(new PTableKey(tenantId, name));
+ table = metadata.getTableRef(new PTableKey(tenantId, name)).getTable();
if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
}
@@ -1505,22 +1622,213 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException {
- Map<String, Object> tableProps = new HashMap<String, Object>();
- HTableDescriptor tableDescriptor = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, tableProps);
- SQLException sqlE = null;
+ List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size());
+ Map<String, Object> tableProps = new HashMap<String, Object>();
+ Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
+ Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
+ boolean nonTxToTx = false;
+ Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
+ HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
+ HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
if (tableDescriptor != null) {
- try {
- boolean modifyHTable = true;
- if (table.getType() == PTableType.VIEW) {
- boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE,
- QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
- // When adding a column to a view, base physical table should only be modified when new column families are being added.
- modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded);
+ tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+ origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+ tableDescriptors.add(tableDescriptor);
+ origTableDescriptors.add(origTableDescriptor);
+ nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
+ /*
+ * If the table was transitioned from non transactional to transactional, we need
+ * to also transition the index tables.
+ */
+ if (nonTxToTx) {
+ updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors);
+ }
+ }
+
+ boolean success = false;
+ boolean metaDataUpdated = !tableDescriptors.isEmpty();
+ boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
+ MetaDataMutationResult result = null;
+ try {
+ boolean modifyHTable = true;
+ if (table.getType() == PTableType.VIEW) {
+ boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE,
+ QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
+ // When adding a column to a view, base physical table should only be modified when new column families are being added.
+ modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded);
+ }
+ if (modifyHTable) {
+ sendHBaseMetaData(tableDescriptors, pollingNeeded);
+ }
+
+ // Special case for call during drop table to ensure that the empty column family exists.
+ // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
+ // as args to this function, we have no way of getting them in this case.
+ // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
+ // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
+ if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
+ return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
+ }
+ byte[][] rowKeyMetaData = new byte[3][];
+ PTableType tableType = table.getType();
+
+ Mutation m = tableMetaData.get(0);
+ byte[] rowKey = m.getRow();
+ SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
+
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ result = metaDataCoprocessorExec(tableKey,
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
+ for (Mutation m : tableMetaData) {
+ MutationProto mp = ProtobufUtil.toProto(m);
+ builder.addTableMetadataMutations(mp.toByteString());
+ }
+ builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+ instance.addColumn(controller, builder.build(), rpcCallback);
+ if(controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ });
+
+ if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success
+ success = true;
+ // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
+ if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
+ && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
+ flushTable(table.getPhysicalName().getBytes());
+ }
+
+ if (tableType == PTableType.TABLE) {
+ // If we're changing MULTI_TENANT to true or false, create or drop the view index table
+ if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
+ long timestamp = MetaDataUtil.getClientTimeStamp(m);
+ if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
+ this.ensureViewIndexTableCreated(table, timestamp);
+ } else {
+ this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
+ }
+ }
}
- boolean pollingNotNeeded = (!tableProps.isEmpty() && !existingColumnFamilies(table).containsAll(colFamiliesForPColumnsToBeAdded));
- if (modifyHTable) {
- modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded);
+ }
+ } finally {
+ // If we weren't successful with our metadata update
+ // and we've already pushed the HBase metadata changes to the server
+ // and we've tried to go from non transactional to transactional
+ // then we must undo the metadata change otherwise the table will
+ // no longer function correctly.
+ // Note that if this fails, we're in a corrupt state.
+ if (!success && metaDataUpdated && nonTxToTx) {
+ sendHBaseMetaData(origTableDescriptors, pollingNeeded);
+ }
+ }
+ return result;
+ }
+ private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor,
+ String txValue, Set<HTableDescriptor> descriptorsToUpdate, Set<HTableDescriptor> origDescriptors) throws SQLException {
+ HBaseAdmin admin = null;
+ byte[] physicalTableName = table.getPhysicalName().getBytes();
+ try {
+ admin = new HBaseAdmin(config);
+ setTransactional(tableDescriptor, table.getType(), txValue, tableProps);
+ Map<String, Object> indexTableProps;
+ if (txValue == null) {
+ indexTableProps = Collections.<String,Object>emptyMap();
+ } else {
+ indexTableProps = Maps.newHashMapWithExpectedSize(1);
+ indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue));
+ }
+ for (PTable index : table.getIndexes()) {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes());
+ origDescriptors.add(indexDescriptor);
+ indexDescriptor = new HTableDescriptor(indexDescriptor);
+ descriptorsToUpdate.add(indexDescriptor);
+ if (index.getColumnFamilies().isEmpty()) {
+ byte[] dataFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] indexFamilyName = SchemaUtil.getEmptyColumnFamily(index);
+ HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName);
+ HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName);
+ indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+ indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+ } else {
+ for (PColumnFamily family : index.getColumnFamilies()) {
+ byte[] familyName = family.getName().getBytes();
+ indexDescriptor.getFamily(familyName).setMaxVersions(tableDescriptor.getFamily(familyName).getMaxVersions());
+ HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+ HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+ indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+ indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+ }
}
+ setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps);
+ }
+ try {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName));
+ origDescriptors.add(indexDescriptor);
+ indexDescriptor = new HTableDescriptor(indexDescriptor);
+ descriptorsToUpdate.add(indexDescriptor);
+ setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
+ setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+ } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+ // Ignore, as we may never have created a view index table
+ }
+ try {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName));
+ origDescriptors.add(indexDescriptor);
+ indexDescriptor = new HTableDescriptor(indexDescriptor);
+ descriptorsToUpdate.add(indexDescriptor);
+ setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
+ setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+ } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+ // Ignore, as we may never have created a view index table
+ }
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (admin != null) admin.close();
+ } catch (IOException e) {
+ logger.warn("Could not close admin",e);
+ }
+ }
+ }
+ private void setSharedIndexMaxVersion(PTable table, HTableDescriptor tableDescriptor,
+ HTableDescriptor indexDescriptor) {
+ if (table.getColumnFamilies().isEmpty()) {
+ byte[] familyName = SchemaUtil.getEmptyColumnFamily(table);
+ HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+ HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+ indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+ indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+ } else {
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ byte[] familyName = family.getName().getBytes();
+ HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+ if (indexColDescriptor != null) {
+ HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+ indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+ indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+ }
+ }
+ }
+ }
+
+ private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException {
+ SQLException sqlE = null;
+ for (HTableDescriptor descriptor : tableDescriptors) {
+ try {
+ modifyTable(descriptor.getName(), descriptor, pollingNeeded);
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
@@ -1535,76 +1843,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
- // Special case for call during drop table to ensure that the empty column family exists.
- // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
- // as args to this function, we have no way of getting them in this case.
- // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
- // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
- if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
- return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
- }
- byte[][] rowKeyMetaData = new byte[3][];
- PTableType tableType = table.getType();
-
- Mutation m = tableMetaData.get(0);
- byte[] rowKey = m.getRow();
- SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
- byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
- byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
- byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
- byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
-
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
- new Batch.Call<MetaDataService, MetaDataResponse>() {
- @Override
- public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<MetaDataResponse> rpcCallback =
- new BlockingRpcCallback<MetaDataResponse>();
- AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
- for (Mutation m : tableMetaData) {
- MutationProto mp = ProtobufUtil.toProto(m);
- builder.addTableMetadataMutations(mp.toByteString());
- }
- builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
- instance.addColumn(controller, builder.build(), rpcCallback);
- if(controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
- return rpcCallback.get();
- }
- });
-
- if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND) { // Success
- // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
- if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
- && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
- flushTable(table.getPhysicalName().getBytes());
- }
-
- if (tableType == PTableType.TABLE) {
- // If we're changing MULTI_TENANT to true or false, create or drop the view index table
- if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
- long timestamp = MetaDataUtil.getClientTimeStamp(m);
- if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
- this.ensureViewIndexTableCreated(table, timestamp);
- } else {
- this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
- }
- }
- }
+ }
+ private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
+ if (txValue == null) {
+ tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
+ } else {
+ tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
}
- return result;
+ this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
}
- private HTableDescriptor separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, Map<String, Object> tableProps) throws SQLException {
+ private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size());
Map<String,Object> commonFamilyProps = new HashMap<>();
- boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && colFamiliesForPColumnsToBeAdded.size() > 0;
+ boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty();
HashSet<String> existingColumnFamilies = existingColumnFamilies(table);
Map<String, Map<String, Object>> allFamiliesProps = new HashMap<>(existingColumnFamilies.size());
+ boolean isTransactional = table.isTransactional();
+ boolean willBeTransactional = false;
+ boolean isOrWillBeTransactional = isTransactional;
+ Integer newTTL = null;
for (String family : properties.keySet()) {
List<Pair<String, Object>> propsList = properties.get(family);
if (propsList != null && propsList.size() > 0) {
@@ -1630,22 +1888,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (TableProperty.isPhoenixTableProperty(propName)) {
TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
if (propName.equals(TTL)) {
+ newTTL = ((Number)prop.getSecond()).intValue();
// Even though TTL is really a HColumnProperty we treat it specially.
// We enforce that all column families have the same TTL.
commonFamilyProps.put(propName, prop.getSecond());
+ } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
+ willBeTransactional = isOrWillBeTransactional = true;
+ tableProps.put(TxConstants.READ_NON_TX_DATA, propValue);
}
} else {
if (isHColumnProperty(propName)) {
if (family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
- commonFamilyProps.put(propName, prop.getSecond());
+ commonFamilyProps.put(propName, propValue);
} else {
- colFamilyPropsMap.put(propName, prop.getSecond());
+ colFamilyPropsMap.put(propName, propValue);
}
} else {
// invalid property - neither of HTableProp, HColumnProp or PhoenixTableProp
// FIXME: This isn't getting triggered as currently a property gets evaluated
// as HTableProp if its neither HColumnProp or PhoenixTableProp.
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY)
.setMessage("Column Family: " + family + ", Property: " + propName).build()
.buildException();
}
@@ -1728,16 +1990,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
allFamiliesProps.put(Bytes.toString(table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : table.getDefaultFamilyName().getBytes() ), commonFamilyProps);
}
-
// Views are not allowed to have any of these properties.
if (table.getType() == PTableType.VIEW && (!stmtFamiliesPropsMap.isEmpty() || !commonFamilyProps.isEmpty() || !tableProps.isEmpty())) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
.buildException();
}
+
HTableDescriptor newTableDescriptor = null;
+ HTableDescriptor origTableDescriptor = null;
if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) {
byte[] tableNameBytes = Bytes.toBytes(table.getPhysicalName().getString());
- HTableDescriptor existingTableDescriptor = getTableDescriptor(tableNameBytes);
+ HTableDescriptor existingTableDescriptor = origTableDescriptor = getTableDescriptor(tableNameBytes);
newTableDescriptor = new HTableDescriptor(existingTableDescriptor);
if (!tableProps.isEmpty()) {
// add all the table properties to the existing table descriptor
@@ -1747,21 +2010,97 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
if (addingColumns) {
// Make sure that all the CFs of the table have the same TTL as the empty CF.
- setTTLToEmptyCFTTL(allFamiliesProps, table, newTableDescriptor);
+ setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, newTTL);
+ }
+ // Set TTL on all table column families, even if they're not referenced here
+ if (newTTL != null) {
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ if (!allFamiliesProps.containsKey(family.getName().getString())) {
+ Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
+ familyProps.put(TTL, newTTL);
+ allFamiliesProps.put(family.getName().getString(), familyProps);
+ }
+ }
+ }
+ Integer defaultTxMaxVersions = null;
+ if (isOrWillBeTransactional) {
+ // Calculate default for max versions
+ Map<String, Object> emptyFamilyProps = allFamiliesProps.get(SchemaUtil.getEmptyColumnFamilyAsString(table));
+ if (emptyFamilyProps != null) {
+ defaultTxMaxVersions = (Integer)emptyFamilyProps.get(HConstants.VERSIONS);
+ }
+ if (defaultTxMaxVersions == null) {
+ if (isTransactional) {
+ defaultTxMaxVersions = newTableDescriptor.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getMaxVersions();
+ } else {
+ defaultTxMaxVersions =
+ this.getProps().getInt(
+ QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
+ }
+ }
+ if (willBeTransactional) {
+ // Set VERSIONS for all column families when transitioning to transactional
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ if (!allFamiliesProps.containsKey(family.getName().getString())) {
+ Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
+ familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
+ allFamiliesProps.put(family.getName().getString(), familyProps);
+ }
+ }
+ }
+ }
+ // Set Tephra's TTL property based on HBase property if we're
+ // transitioning to become transactional or setting TTL on
+ // an already transactional table.
+ if (isOrWillBeTransactional) {
+ int ttl = getTTL(table, newTableDescriptor, newTTL);
+ if (ttl != HColumnDescriptor.DEFAULT_TTL) {
+ for (Map.Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
+ Map<String, Object> props = entry.getValue();
+ if (props == null) {
+ props = new HashMap<String, Object>();
+ }
+ props.put(TxConstants.PROPERTY_TTL, ttl);
+ // Remove HBase TTL if we're not transitioning an existing table to become transactional
+ // or if the existing transactional table wasn't originally non transactional.
+ if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) {
+ props.remove(TTL);
+ }
+ }
+ }
}
for (Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
- byte[] cf = entry.getKey().getBytes();
+ Map<String,Object> familyProps = entry.getValue();
+ if (isOrWillBeTransactional) {
+ if (!familyProps.containsKey(HConstants.VERSIONS)) {
+ familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
+ }
+ }
+ byte[] cf = Bytes.toBytes(entry.getKey());
HColumnDescriptor colDescriptor = newTableDescriptor.getFamily(cf);
if (colDescriptor == null) {
// new column family
- colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, entry.getValue()), table.getType());
+ colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, familyProps), table.getType());
newTableDescriptor.addFamily(colDescriptor);
} else {
- modifyColumnFamilyDescriptor(colDescriptor, entry.getValue());
+ modifyColumnFamilyDescriptor(colDescriptor, familyProps);
+ }
+ if (isOrWillBeTransactional) {
+ checkTransactionalVersionsValue(colDescriptor);
}
}
}
- return newTableDescriptor;
+ return new Pair<>(origTableDescriptor, newTableDescriptor);
+ }
+
+ private void checkTransactionalVersionsValue(HColumnDescriptor colDescriptor) throws SQLException {
+ int maxVersions = colDescriptor.getMaxVersions();
+ if (maxVersions <= 1) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE)
+ .setFamilyName(colDescriptor.getNameAsString())
+ .build().buildException();
+ }
}
private boolean isHColumnProperty(String propName) {
@@ -1775,7 +2114,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private HashSet<String> existingColumnFamiliesForBaseTable(PName baseTableName) throws TableNotFoundException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
- PTable table = latestMetaData.getTable(new PTableKey(null, baseTableName.getString()));
+ PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable();
latestMetaDataLock.notifyAll();
return existingColumnFamilies(table);
}
@@ -1790,20 +2129,23 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return cfNames;
}
- private int getTTLForEmptyCf(byte[] emptyCf, byte[] tableNameBytes, HTableDescriptor tableDescriptor) throws SQLException {
- if (tableDescriptor == null) {
- tableDescriptor = getTableDescriptor(tableNameBytes);
- }
- return tableDescriptor.getFamily(emptyCf).getTimeToLive();
+ private static int getTTL(PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
+ // If we're setting TTL now, then use that value. Otherwise, use empty column family value
+ int ttl = newTTL != null ? newTTL
+ : tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive();
+ return ttl;
}
- private void setTTLToEmptyCFTTL(Map<String, Map<String, Object>> familyProps, PTable table,
- HTableDescriptor tableDesc) throws SQLException {
+ private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table,
+ HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
if (!familyProps.isEmpty()) {
- int emptyCFTTL = getTTLForEmptyCf(SchemaUtil.getEmptyColumnFamily(table), table.getPhysicalName().getBytes(), tableDesc);
- for (String family : familyProps.keySet()) {
- Map<String, Object> props = familyProps.get(family) != null ? familyProps.get(family) : new HashMap<String, Object>();
- props.put(TTL, emptyCFTTL);
+ int ttl = getTTL(table, tableDesc, newTTL);
+ for (Map.Entry<String, Map<String, Object>> entry : familyProps.entrySet()) {
+ Map<String, Object> props = entry.getValue();
+ if (props == null) {
+ props = new HashMap<String, Object>();
+ }
+ props.put(TTL, ttl);
}
}
}
@@ -1860,7 +2202,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
// Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
- PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache());
+ PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
SQLException sqlE = null;
try {
metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns );
@@ -1938,7 +2280,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
String columnsToAdd = "";
if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
// We know that we always need to add the STORE_NULLS column for 4.3 release
- columnsToAdd = ", " + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
+ columnsToAdd += "," + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
HBaseAdmin admin = null;
try {
admin = getAdmin();
@@ -2020,6 +2362,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd, false);
}
+ if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+ columnsToAdd = PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName();
+ metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, columnsToAdd, false);
+ }
}
int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
@@ -2138,8 +2485,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public boolean hasInvalidIndexConfiguration() {
- return hasInvalidIndexConfiguration;
+ public boolean isMutableIndexWALCodecInstalled() {
+ return isMutableIndexWALCodecInstalled;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 87fb88e..77e4ba9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -24,9 +24,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -81,6 +83,10 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.SequenceUtil;
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.inmemory.InMemoryTxSystemClient;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -99,18 +105,37 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
private PMetaData metaData;
private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap();
private final String userName;
+ private final TransactionSystemClient txSystemClient;
private KeyValueBuilder kvBuilder;
private volatile boolean initialized;
private volatile SQLException initializationException;
private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap();
- public ConnectionlessQueryServicesImpl(QueryServices queryServices, ConnectionInfo connInfo) {
- super(queryServices);
+ public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) {
+ super(services);
userName = connInfo.getPrincipal();
metaData = newEmptyMetaData();
// Use KeyValueBuilder that builds real KeyValues, as our test utils require this
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+ Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ for (Entry<String,String> entry : services.getProps()) {
+ config.set(entry.getKey(), entry.getValue());
+ }
+ if (info != null) {
+ for (Object key : info.keySet()) {
+ config.set((String) key, info.getProperty((String) key));
+ }
+ }
+ for (Entry<String,String> entry : connInfo.asProps()) {
+ config.set(entry.getKey(), entry.getValue());
+ }
+
+ // Without making a copy of the configuration we cons up, we lose some of our properties
+ // on the server side during testing.
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+ TransactionManager txnManager = new TransactionManager(config);
+ this.txSystemClient = new InMemoryTxSystemClient(txnManager);
}
private PMetaData newEmptyMetaData() {
@@ -141,14 +166,19 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public PMetaData addTable(PTable table) throws SQLException {
- return metaData = metaData.addTable(table);
+ public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+ return metaData = metaData.addTable(table, resolvedTime);
+ }
+
+ @Override
+ public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+ return metaData = metaData.updateResolvedTimestamp(table, resolvedTimestamp);
}
@Override
public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
- long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
- return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+ long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+ return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
}
@Override
@@ -159,8 +189,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp,
- long tableSeqNum) throws SQLException {
- return metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+ long tableSeqNum, long resolvedTime) throws SQLException {
+ return metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
}
@@ -175,7 +205,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// to get anything from the server (since we don't have a connection)
try {
String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes);
- PTable table = metaData.getTable(new PTableKey(tenantId, fullTableName));
+ PTable table = metaData.getTableRef(new PTableKey(tenantId, fullTableName)).getTable();
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, table, true);
} catch (TableNotFoundException e) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null);
@@ -345,7 +375,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
String indexName = Bytes.toString(rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
String indexTableName = SchemaUtil.getTableName(schemaName, indexName);
PName tenantId = tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes);
- PTable index = metaData.getTable(new PTableKey(tenantId, indexTableName));
+ PTable index = metaData.getTableRef(new PTableKey(tenantId, indexTableName)).getTable();
index = PTableImpl.makePTable(index,newState == PIndexState.USABLE ? PIndexState.ACTIVE : newState == PIndexState.UNUSABLE ? PIndexState.INACTIVE : newState);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, index);
}
@@ -360,8 +390,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public boolean hasInvalidIndexConfiguration() {
- return false;
+ public boolean isMutableIndexWALCodecInstalled() {
+ return true;
}
@Override
@@ -492,6 +522,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
+ public TransactionSystemClient getTransactionSystemClient() {
+ return txSystemClient;
+ }
+
public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
throws SQLException {
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 4153652..4952355 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -46,6 +46,8 @@ import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.PTableStats;
+import co.cask.tephra.TransactionSystemClient;
+
public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices {
@@ -74,14 +76,19 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public PMetaData addTable(PTable table) throws SQLException {
- return getDelegate().addTable(table);
+ public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+ return getDelegate().addTable(table, resolvedTime);
+ }
+
+ @Override
+ public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+ return getDelegate().updateResolvedTimestamp(table, resolvedTimestamp);
}
@Override
public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
- long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
- return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+ long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+ return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
}
@Override
@@ -92,8 +99,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
@Override
public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp,
- long tableSeqNum) throws SQLException {
- return getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+ long tableSeqNum, long resolvedTime) throws SQLException {
+ return getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
}
@Override
@@ -165,8 +172,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public boolean hasInvalidIndexConfiguration() {
- return getDelegate().hasInvalidIndexConfiguration();
+ public boolean isMutableIndexWALCodecInstalled() {
+ return getDelegate().isMutableIndexWALCodecInstalled();
}
@Override
@@ -254,6 +261,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
+ public TransactionSystemClient getTransactionSystemClient() {
+ return getDelegate().getTransactionSystemClient();
+ }
+
public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
throws SQLException {
return getDelegate().createFunction(functionData, function, temporary);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
index 76e7593..8e7a70d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
@@ -35,10 +35,11 @@ import org.apache.phoenix.schema.PTable;
* @since 0.1
*/
public interface MetaDataMutated {
- PMetaData addTable(PTable table) throws SQLException;
+ PMetaData addTable(PTable table, long resolvedTime) throws SQLException;
+ PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException;
PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException;
- PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException;
- PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException;
+ PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException;
+ PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException;
PMetaData addFunction(PFunction function) throws SQLException;
PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 268bfc1..1cac37f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -96,6 +96,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
@@ -112,6 +113,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
@@ -172,9 +174,15 @@ public interface QueryConstants {
public final static int MILLIS_IN_DAY = 1000 * 60 * 60 * 24;
public static final String EMPTY_COLUMN_NAME = "_0";
+ // For transactional tables, the value of our empty key value can no longer be empty
+ // since empty values are treated as column delete markers.
public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
EMPTY_COLUMN_BYTES);
+ public final static String EMPTY_COLUMN_VALUE = "x";
+ public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+ public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+ EMPTY_COLUMN_VALUE_BYTES);
public static final String DEFAULT_COLUMN_FAMILY = "0";
public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
@@ -251,13 +259,15 @@ public interface QueryConstants {
STORE_NULLS + " BOOLEAN," +
BASE_COLUMN_COUNT + " INTEGER," +
// Column metadata (will be null for table row)
- IS_ROW_TIMESTAMP + " BOOLEAN, " +
+ IS_ROW_TIMESTAMP + " BOOLEAN, " +
+ TRANSACTIONAL + " BOOLEAN," +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
+ TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
// Install split policy to prevent a tenant's metadata from being split across regions.
- HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_STATS_TABLE_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
@@ -278,7 +288,8 @@ public interface QueryConstants {
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
// Install split policy to prevent a physical table's stats from being split across regions.
- HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_SEQUENCE_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +
@@ -296,7 +307,8 @@ public interface QueryConstants {
LIMIT_REACHED_FLAG + " BOOLEAN \n" +
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
- HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_FUNCTION_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"(\n" +
@@ -320,6 +332,7 @@ public interface QueryConstants {
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
// Install split policy to prevent a tenant's metadata from being split across regions.
- HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index df8cdc3..5f42ff8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -159,6 +159,7 @@ public interface QueryServices extends SQLCloseable {
public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
+ public static final String DEFAULT_TRANSACTIONAL_ATTRIB = "phoenix.transactions.default.enabled";
public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
// rpc queue configs
@@ -172,6 +173,8 @@ public interface QueryServices extends SQLCloseable {
public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";
+ public static final String MAX_VERSIONS_TRANSACTIONAL_ATTRIB = "phoenix.transactions.maxVersions";
+
// queryserver configuration keys
public static final String QUERY_SERVER_SERIALIZATION_ATTRIB = "phoenix.queryserver.serialization";
public static final String QUERY_SERVER_META_FACTORY_ATTRIB = "phoenix.queryserver.metafactory.class";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0434ac9..8c2f895 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -116,7 +116,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
- public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
+ public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
// Only the first chunked batches are fetched in parallel, so this default
// should be on the relatively bigger side of things. Bigger means more
@@ -190,7 +190,9 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_STORE_NULLS = false;
// TODO Change this to true as part of PHOENIX-1543
+ // We'll also need this for transactions to work correctly
public static final boolean DEFAULT_AUTO_COMMIT = false;
+ public static final boolean DEFAULT_TRANSACTIONAL = false;
public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
@@ -202,6 +204,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
public static final boolean DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = true;
+ public static final int DEFAULT_MAX_VERSIONS_TRANSACTIONAL = Integer.MAX_VALUE;
public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 5a0d63b..7fb90a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -162,8 +162,8 @@ public class DelegateTable implements PTable {
}
@Override
- public void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
- delegate.getIndexMaintainers(ptr, connection);
+ public boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+ return delegate.getIndexMaintainers(ptr, connection);
}
@Override
@@ -238,6 +238,10 @@ public class DelegateTable implements PTable {
}
@Override
+ public boolean isTransactional() {
+ return delegate.isTransactional();
+ }
+
public int getBaseColumnCount() {
return delegate.getBaseColumnCount();
}