You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:40 UTC
[02/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation
transaction support through Tephra (James Taylor, Thomas D'Silva)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 21548b0..cb9b831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -72,6 +72,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
@@ -169,6 +170,7 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServices.Feature;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -193,10 +195,13 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.cask.tephra.TxConstants;
+
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
@@ -230,8 +235,9 @@ public class MetaDataClient {
VIEW_INDEX_ID + "," +
INDEX_TYPE + "," +
STORE_NULLS + "," +
- BASE_COLUMN_COUNT +
- ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ BASE_COLUMN_COUNT + "," +
+ TRANSACTIONAL +
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final String CREATE_LINK =
"UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -372,7 +378,7 @@ public class MetaDataClient {
MetaDataMutationResult result = updateCache(schemaName, tableName, true);
return result.getMutationTime();
}
-
+
/**
* Update the cache with the latest as of the connection scn.
* @param schemaName
@@ -391,6 +397,10 @@ public class MetaDataClient {
public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException {
return updateCache(tenantId, schemaName, tableName, false);
}
+
+ public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+ return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null);
+ }
/**
* Update the cache with the latest as of the connection scn.
@@ -415,24 +425,47 @@ public class MetaDataClient {
long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return clientTimeStamp;
}
-
+
+ private long getCurrentScn() {
+ Long scn = connection.getSCN();
+ long currentScn = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ return currentScn;
+ }
+
private MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName,
- boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez
- long clientTimeStamp = getClientTimeStamp();
+ boolean alwaysHitServer, Long resolvedTimestamp) throws SQLException { // TODO: pass byte[] herez
boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
// System tables must always have a null tenantId
tenantId = systemTable ? null : tenantId;
PTable table = null;
+ PTableRef tableRef = null;
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
long tableTimestamp = HConstants.LATEST_TIMESTAMP;
+ long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
try {
- table = connection.getTable(new PTableKey(tenantId, fullTableName));
+ tableRef = connection.getTableRef(new PTableKey(tenantId, fullTableName));
+ table = tableRef.getTable();
tableTimestamp = table.getTimeStamp();
+ tableResolvedTimestamp = tableRef.getResolvedTimeStamp();
} catch (TableNotFoundException e) {
}
- // Don't bother with server call: we can't possibly find a newer table
- if (table != null && !alwaysHitServer && (systemTable || tableTimestamp == clientTimeStamp - 1)) {
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,table);
+
+ boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean(
+ QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+ // start a txn if all table are transactional by default or if we found the table in the cache and it is transactional
+ // TODO if system tables become transactional remove the check
+ boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional());
+ if (!systemTable && isTransactional && !connection.getMutationState().isTransactionStarted()) {
+ connection.getMutationState().startTransaction();
+ }
+ resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
+ // Do not make rpc to getTable if
+ // 1. table is a system table
+ // 2. table was already resolved as of that timestamp
+ if (table != null && !alwaysHitServer
+ && (systemTable || resolvedTimestamp == tableResolvedTimestamp)) {
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
}
int maxTryCount = tenantId == null ? 1 : 2;
@@ -442,7 +475,12 @@ public class MetaDataClient {
do {
final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
- result = connection.getQueryServices().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimeStamp);
+ ConnectionQueryServices queryServices = connection.getQueryServices();
+ result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, resolvedTimestamp);
+ // if the table was assumed to be transactional, but is actually not transactional then re-resolve as of the right timestamp (and vice versa)
+ if (table==null && result.getTable()!=null && result.getTable().isTransactional()!=isTransactional) {
+ result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, TransactionUtil.getResolvedTimestamp(connection, result.getTable().isTransactional(), HConstants.LATEST_TIMESTAMP));
+ }
if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
return result;
@@ -466,15 +504,20 @@ public class MetaDataClient {
// timestamp, we can handle this such that we don't ask the
// server again.
if (table != null) {
+ // Ensures that table in result is set to table found in our cache.
if (code == MutationCode.TABLE_ALREADY_EXISTS) {
- // Ensures that table in result is set to table found in our cache.
- result.setTable(table);
- // Although this table is up-to-date, the parent table may not be.
- // In this case, we update the parent table which may in turn pull
- // in indexes to add to this table.
- if (addIndexesFromPhysicalTable(result)) {
- connection.addTable(result.getTable());
- }
+ result.setTable(table);
+ // Although this table is up-to-date, the parent table may not be.
+ // In this case, we update the parent table which may in turn pull
+ // in indexes to add to this table.
+ long resolvedTime = TransactionUtil.getResolvedTime(connection, result);
+ if (addIndexesFromPhysicalTable(result, resolvedTimestamp)) {
+ connection.addTable(result.getTable(), resolvedTime);
+ }
+ else {
+ // if we aren't adding the table, we still need to update the resolved time of the table
+ connection.updateResolvedTimestamp(table, resolvedTime);
+ }
return result;
}
// If table was not found at the current time stamp and we have one cached, remove it.
@@ -566,10 +609,11 @@ public class MetaDataClient {
* of the table for which we just updated.
* TODO: combine this round trip with the one that updates the cache for the child table.
* @param result the result from updating the cache for the current table.
+ * @param resolvedTimestamp timestamp at which child table was resolved
* @return true if the PTable contained by result was modified and false otherwise
* @throws SQLException if the physical table cannot be found
*/
- private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException {
+ private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result, Long resolvedTimestamp) throws SQLException {
PTable table = result.getTable();
// If not a view or if a view directly over an HBase table, there's nothing to do
if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) {
@@ -578,7 +622,7 @@ public class MetaDataClient {
String physicalName = table.getPhysicalName().getString();
String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
- MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false);
+ MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false, resolvedTimestamp);
PTable physicalTable = parentResult.getTable();
if (physicalTable == null) {
throw new TableNotFoundException(schemaName, tableName);
@@ -782,7 +826,7 @@ public class MetaDataClient {
public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null);
- if (table == null || table.getType() == PTableType.VIEW) {
+ if (table == null || table.getType() == PTableType.VIEW || table.isTransactional()) {
return new MutationState(0,connection);
}
// Hack to get around the case when an SCN is specified on the connection.
@@ -1137,7 +1181,7 @@ public class MetaDataClient {
if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
}
- if (connection.getQueryServices().hasInvalidIndexConfiguration()) {
+ if (!connection.getQueryServices().isMutableIndexWALCodecInstalled() && !dataTable.isTransactional()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
}
}
@@ -1268,19 +1312,19 @@ public class MetaDataClient {
// as there's no need to burn another sequence value.
if (allocateIndexId && indexId == null) {
Long scn = connection.getSCN();
- long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
PName tenantId = connection.getTenantId();
String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
PName physicalName = dataTable.getPhysicalName();
int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets);
- // Create at parent timestamp as we know that will be earlier than now
- // and earlier than any SCN if one is set.
+ // if scn is set create at scn-1, so we can see the sequence or else use latest timestamp (so that latest server time is used)
+ long sequenceTimestamp = scn!=null ? scn-1 : HConstants.LATEST_TIMESTAMP;
createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
true, Short.MIN_VALUE, 1, 1, false, Long.MIN_VALUE, Long.MAX_VALUE,
- dataTable.getTimeStamp());
+ sequenceTimestamp);
long[] seqValues = new long[1];
SQLException[] sqlExceptions = new SQLException[1];
+ long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions);
if (sqlExceptions[0] != null) {
@@ -1397,7 +1441,7 @@ public class MetaDataClient {
functionUpsert.setString(5, function.getJarPath());
functionUpsert.setString(6, function.getReturnType());
functionUpsert.execute();
- functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ functionData.addAll(connection.getMutationState().toMutations(null).next().getSecond());
connection.rollback();
MetaDataMutationResult result = connection.getQueryServices().createFunction(functionData, function, stmt.isTemporary());
MutationCode code = result.getMutationCode();
@@ -1498,29 +1542,36 @@ public class MetaDataClient {
long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
boolean multiTenant = false;
boolean storeNulls = false;
+ boolean transactional = false;
Integer saltBucketNum = null;
String defaultFamilyName = null;
boolean isImmutableRows = false;
List<PName> physicalNames = Collections.emptyList();
boolean addSaltColumn = false;
boolean rowKeyOrderOptimizable = true;
+ Long timestamp = null;
if (parent != null && tableType == PTableType.INDEX) {
- // Index on view
- // TODO: Can we support a multi-tenant index directly on a multi-tenant
- // table instead of only a view? We don't have anywhere to put the link
- // from the table to the index, though.
- if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
- PName physicalName = parent.getPhysicalName();
- saltBucketNum = parent.getBucketNum();
- addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
- defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
- if (indexType == IndexType.LOCAL) {
- saltBucketNum = null;
- // Set physical name of local index table
- physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
- } else {
- // Set physical name of view index table
- physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+ transactional = parent.isTransactional();
+ timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
+ storeNulls = parent.getStoreNulls();
+ if (tableType == PTableType.INDEX) {
+ // Index on view
+ // TODO: Can we support a multi-tenant index directly on a multi-tenant
+ // table instead of only a view? We don't have anywhere to put the link
+ // from the table to the index, though.
+ if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
+ PName physicalName = parent.getPhysicalName();
+ saltBucketNum = parent.getBucketNum();
+ addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
+ defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
+ if (indexType == IndexType.LOCAL) {
+ saltBucketNum = null;
+ // Set physical name of local index table
+ physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
+ } else {
+ // Set physical name of view index table
+ physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+ }
}
}
@@ -1536,7 +1587,7 @@ public class MetaDataClient {
incrementStatement.execute();
// Get list of mutations and add to table meta data that will be passed to server
// to guarantee order. This row will always end up last
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
// Add row linking from data table row to index table row
@@ -1561,12 +1612,10 @@ public class MetaDataClient {
}
Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
- Map<String,Object> commonFamilyProps = Collections.emptyMap();
+ Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
// Somewhat hacky way of determining if property is for HColumnDescriptor or HTableDescriptor
HColumnDescriptor defaultDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
if (!statement.getProps().isEmpty()) {
- commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
-
Collection<Pair<String,Object>> props = statement.getProps().get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
for (Pair<String,Object> prop : props) {
if (defaultDescriptor.getValue(prop.getFirst()) == null) {
@@ -1580,7 +1629,7 @@ public class MetaDataClient {
// Although unusual, it's possible to set a mapped VIEW as having immutable rows.
// This tells Phoenix that you're managing the index maintenance yourself.
if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
- Boolean isImmutableRowsProp = (Boolean) tableProps.remove(PTable.IS_IMMUTABLE_ROWS_PROP_NAME);
+ Boolean isImmutableRowsProp = (Boolean) tableProps.get(PTable.IS_IMMUTABLE_ROWS_PROP_NAME);
if (isImmutableRowsProp == null) {
isImmutableRows = connection.getQueryServices().getProps().getBoolean(QueryServices.IMMUTABLE_ROWS_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_ROWS);
} else {
@@ -1590,7 +1639,7 @@ public class MetaDataClient {
// Can't set any of these on views or shared indexes on views
if (tableType != PTableType.VIEW && indexId == null) {
- saltBucketNum = (Integer) tableProps.remove(PhoenixDatabaseMetaData.SALT_BUCKETS);
+ saltBucketNum = (Integer) tableProps.get(PhoenixDatabaseMetaData.SALT_BUCKETS);
if (saltBucketNum != null) {
if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_BUCKET_NUM).build().buildException();
@@ -1607,42 +1656,110 @@ public class MetaDataClient {
addSaltColumn = (saltBucketNum != null);
}
- boolean removedProp = false;
// Can't set MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an INDEX or a non mapped VIEW
if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
- Boolean multiTenantProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.MULTI_TENANT);
+ Boolean multiTenantProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.MULTI_TENANT);
multiTenant = Boolean.TRUE.equals(multiTenantProp);
- defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
- removedProp = (defaultFamilyName != null);
+ defaultFamilyName = (String)tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
}
boolean disableWAL = false;
- Boolean disableWALProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.DISABLE_WAL);
+ Boolean disableWALProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.DISABLE_WAL);
if (disableWALProp != null) {
disableWAL = disableWALProp;
}
- Boolean storeNullsProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.STORE_NULLS);
- storeNulls = storeNullsProp == null
- ? connection.getQueryServices().getProps().getBoolean(
- QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
- QueryServicesOptions.DEFAULT_STORE_NULLS)
- : storeNullsProp;
+ Boolean storeNullsProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.STORE_NULLS);
+ if (storeNullsProp == null) {
+ if (parent == null) {
+ storeNulls = connection.getQueryServices().getProps().getBoolean(
+ QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
+ QueryServicesOptions.DEFAULT_STORE_NULLS);
+ tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
+ }
+ } else {
+ storeNulls = storeNullsProp;
+ }
+ Boolean transactionalProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.TRANSACTIONAL);
+ if (transactionalProp != null && parent != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL)
+ .setSchemaName(schemaName).setTableName(tableName)
+ .build().buildException();
+ }
+ if (parent == null) {
+ if (transactionalProp == null) {
+ transactional = connection.getQueryServices().getProps().getBoolean(
+ QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+ } else {
+ transactional = transactionalProp;
+ }
+ }
+ tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactional);
+ if (transactional) {
+ // If TTL set, use Tephra TTL property name instead
+ Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL);
+ if (ttl != null) {
+ commonFamilyProps.put(TxConstants.PROPERTY_TTL, ttl);
+ }
+ }
+
+ boolean sharedTable = statement.getTableType() == PTableType.VIEW || indexId != null;
+ if (transactional) {
+ // Tephra uses an empty value cell as its delete marker, so we need to turn on
+ // storeNulls for transactional tables.
+ // If we use regular column delete markers (which is what non transactional tables
+ // use), then they get converted
+ // on the server, but this can mess up our secondary index code as the changes get
+ // committed prior to the
+ // maintenance code being able to see the prior state to update the rows correctly.
+ if (Boolean.FALSE.equals(storeNullsProp)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
+ .setSchemaName(schemaName).setTableName(tableName)
+ .build().buildException();
+ }
+ // Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes
+ storeNulls = true;
+ tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
+
+ if (!sharedTable) {
+ Integer maxVersionsProp = (Integer) commonFamilyProps.get(HConstants.VERSIONS);
+ if (maxVersionsProp == null) {
+ if (parent != null) {
+ HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(parent.getPhysicalName().getBytes());
+ if (desc != null) {
+ maxVersionsProp = desc.getFamily(SchemaUtil.getEmptyColumnFamily(parent)).getMaxVersions();
+ }
+ }
+ if (maxVersionsProp == null) {
+ maxVersionsProp = connection.getQueryServices().getProps().getInt(
+ QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
+ }
+ commonFamilyProps.put(HConstants.VERSIONS, maxVersionsProp);
+ }
+ }
+ }
+ timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp;
// Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
- if ((statement.getTableType() == PTableType.VIEW || indexId != null) && !tableProps.isEmpty()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
- .buildException();
- }
- if (removedProp) {
- tableProps.put(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME, defaultFamilyName);
+ if (sharedTable) {
+ if (tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME) != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE)
+ .setSchemaName(schemaName).setTableName(tableName)
+ .build().buildException();
+ }
+ if (SchemaUtil.hasHTableDescriptorProps(tableProps)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
+ .buildException();
+ }
}
List<ColumnDef> colDefs = statement.getColumnDefs();
List<PColumn> columns;
LinkedHashSet<PColumn> pkColumns;
- if (tenantId != null && (tableType != PTableType.VIEW && indexId == null)) {
+ if (tenantId != null && !sharedTable) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
@@ -1866,8 +1983,8 @@ public class MetaDataClient {
Collections.<PTable>emptyList(), isImmutableRows,
Collections.<PName>emptyList(), defaultFamilyName == null ? null :
PNameFactory.newName(defaultFamilyName), null,
- Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true);
- connection.addTable(table);
+ Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false);
+ connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
} else if (tableType == PTableType.INDEX && indexId == null) {
if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size();
@@ -1922,7 +2039,7 @@ public class MetaDataClient {
addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName, pkName, keySeq, saltBucketNum != null);
}
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
@@ -1968,9 +2085,10 @@ public class MetaDataClient {
} else {
tableUpsert.setInt(20, BASE_TABLE_BASE_COLUMN_COUNT);
}
+ tableUpsert.setBoolean(21, transactional);
tableUpsert.execute();
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
/*
@@ -1996,7 +2114,9 @@ public class MetaDataClient {
MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
- addTableToCache(result);
+ if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table
+ addTableToCache(result);
+ }
if (!statement.ifNotExists()) {
throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
}
@@ -2016,11 +2136,11 @@ public class MetaDataClient {
default:
PName newSchemaName = PNameFactory.newName(schemaName);
PTable table = PTableImpl.makePTable(
- tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(),
+ tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(),
PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
- indexId, indexType, rowKeyOrderOptimizable);
+ indexId, indexType, rowKeyOrderOptimizable, transactional);
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result);
return table;
@@ -2175,6 +2295,8 @@ public class MetaDataClient {
MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
MutationCode code = result.getMutationCode();
+ PTable table = result.getTable();
+ boolean transactional = table!=null && table.isTransactional();
switch (code) {
case TABLE_NOT_FOUND:
if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); }
@@ -2186,12 +2308,10 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
default:
- connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
- result.getMutationTime());
+ connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime());
if (result.getTable() != null && tableType != PTableType.VIEW) {
connection.setAutoCommit(true);
- PTable table = result.getTable();
boolean dropMetaData = result.getTable().getViewIndexId() == null &&
connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
long ts = (scn == null ? result.getMutationTime() : scn);
@@ -2334,12 +2454,12 @@ public class MetaDataClient {
return mutationCode;
}
- private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta) throws SQLException {
- return incrementTableSeqNum(table, expectedType, columnCountDelta, null, null, null, null);
+ private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional) throws SQLException {
+ return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, null, null, null);
}
private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
- Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
+ Boolean isTransactional, Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
throws SQLException {
String schemaName = table.getSchemaName().getString();
String tableName = table.getTableName().getString();
@@ -2371,6 +2491,9 @@ public class MetaDataClient {
if (storeNulls != null) {
mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
}
+ if (isTransactional != null) {
+ mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
+ }
return seqNum;
}
@@ -2404,10 +2527,13 @@ public class MetaDataClient {
Boolean multiTenantProp = null;
Boolean disableWALProp = null;
Boolean storeNullsProp = null;
+ Boolean isTransactionalProp = null;
ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps();
Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
- PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable();
+ TableRef tableRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
+ PTable table = tableRef.getTable();
+ Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
List<ColumnDef> columnDefs = statement.getColumnDefs();
if (columnDefs == null) {
columnDefs = Collections.emptyList();
@@ -2426,6 +2552,8 @@ public class MetaDataClient {
disableWALProp = (Boolean)prop.getSecond();
} else if (propName.equals(STORE_NULLS)) {
storeNullsProp = (Boolean)prop.getSecond();
+ } else if (propName.equals(TRANSACTIONAL)) {
+ isTransactionalProp = (Boolean)prop.getSecond();
}
}
}
@@ -2433,6 +2561,7 @@ public class MetaDataClient {
}
boolean retried = false;
boolean changingPhoenixTableProperty = false;
+ boolean nonTxToTx = false;
while (true) {
ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
table = resolver.getTables().get(0).getTable();
@@ -2487,6 +2616,23 @@ public class MetaDataClient {
changingPhoenixTableProperty = true;
}
}
+ Boolean isTransactional = null;
+ if (isTransactionalProp != null) {
+ if (isTransactionalProp.booleanValue() != table.isTransactional()) {
+ isTransactional = isTransactionalProp;
+ // We can only go one way: from non transactional to transactional
+ // Going the other way would require rewriting the cell timestamps
+ // and doing a major compaction to get rid of any Tephra specific
+ // delete markers.
+ if (!isTransactional) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ timeStamp = TransactionUtil.getTableTimestamp(connection, isTransactional);
+ changingPhoenixTableProperty = true;
+ nonTxToTx = true;
+ }
+ }
int numPkColumnsAdded = 0;
PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE);
@@ -2558,7 +2704,7 @@ public class MetaDataClient {
}
}
- columnMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
} else {
// Check that HBase configured properly for mutable secondary indexing
@@ -2567,10 +2713,12 @@ public class MetaDataClient {
if (Boolean.FALSE.equals(isImmutableRows) && !table.getIndexes().isEmpty()) {
int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
- if (connection.getQueryServices().hasInvalidIndexConfiguration()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ if (connection.getQueryServices().isMutableIndexWALCodecInstalled() && !table.isTransactional()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
}
if (Boolean.TRUE.equals(multiTenant)) {
@@ -2578,17 +2726,17 @@ public class MetaDataClient {
}
}
- if (numPkColumnsAdded>0 && !table.getIndexes().isEmpty()) {
+ if (!table.getIndexes().isEmpty() && (numPkColumnsAdded>0 || nonTxToTx)) {
for (PTable index : table.getIndexes()) {
- incrementTableSeqNum(index, index.getType(), numPkColumnsAdded);
+ incrementTableSeqNum(index, index.getType(), numPkColumnsAdded, nonTxToTx ? Boolean.TRUE : null);
}
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
}
long seqNum = table.getSequenceNumber();
if (changingPhoenixTableProperty || columnDefs.size() > 0) {
- seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isImmutableRows, disableWAL, multiTenant, storeNulls);
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isTransactional, isImmutableRows, disableWAL, multiTenant, storeNulls);
+ tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
}
@@ -2626,10 +2774,22 @@ public class MetaDataClient {
return new MutationState(0,connection);
}
- // Only update client side cache if we aren't adding a PK column to a table with indexes.
+ // Only update client side cache if we aren't adding a PK column to a table with indexes or
+ // transitioning a table from non transactional to transactional.
// We could update the cache manually then too, it'd just be a pain.
- if (numPkColumnsAdded==0 || table.getIndexes().isEmpty()) {
- connection.addColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, disableWAL == null ? table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() : multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls);
+ if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && !nonTxToTx)) {
+ connection.addColumn(
+ tenantId,
+ SchemaUtil.getTableName(schemaName, tableName),
+ columns,
+ result.getMutationTime(),
+ seqNum,
+ isImmutableRows == null ? table.isImmutableRows() : isImmutableRows,
+ disableWAL == null ? table.isWALDisabled() : disableWAL,
+ multiTenant == null ? table.isMultiTenant() : multiTenant,
+ storeNulls == null ? table.getStoreNulls() : storeNulls,
+ isTransactional == null ? table.isTransactional() : isTransactional,
+ TransactionUtil.getResolvedTime(connection, result));
}
// Delete rows in view index if we haven't dropped it already
// We only need to do this if the multiTenant transitioned to false
@@ -2760,12 +2920,12 @@ public class MetaDataClient {
boolean retried = false;
while (true) {
final ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
- PTable table = resolver.getTables().get(0).getTable();
+ TableRef tableRef = resolver.getTables().get(0);
+ PTable table = tableRef.getTable();
List<ColumnName> columnRefs = statement.getColumnRefs();
if(columnRefs == null) {
columnRefs = Lists.newArrayListWithCapacity(0);
}
- TableRef tableRef = null;
List<ColumnRef> columnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size() + table.getIndexes().size());
List<TableRef> indexesToDrop = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + table.getColumns().size() - columnRefs.size()));
@@ -2781,14 +2941,13 @@ public class MetaDataClient {
}
throw e;
}
- tableRef = columnRef.getTableRef();
PColumn columnToDrop = columnRef.getColumn();
tableColumnsToDrop.add(columnToDrop);
if (SchemaUtil.isPKColumn(columnToDrop)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_DROP_PK)
.setColumnName(columnToDrop.getName().getString()).build().buildException();
}
- columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition()));
+ columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition()));
}
dropColumnMutations(table, tableColumnsToDrop, tableMetaData);
@@ -2810,16 +2969,17 @@ public class MetaDataClient {
}
}
if(!indexColumnsToDrop.isEmpty()) {
- incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size());
+ incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null);
dropColumnMutations(index, indexColumnsToDrop, tableMetaData);
}
}
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
+ tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
- long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size());
- tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null);
+ tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
// Force table header to be first in list
Collections.reverse(tableMetaData);
@@ -2870,7 +3030,7 @@ public class MetaDataClient {
// client-side cache as it would be too painful. Just let it pull it over from
// the server when needed.
if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) {
- connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum);
+ connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
}
// If we have a VIEW, then only delete the metadata, and leave the table data alone
if (table.getType() != PTableType.VIEW) {
@@ -2955,7 +3115,8 @@ public class MetaDataClient {
tableUpsert.close();
}
}
- List<Mutation> tableMetadata = connection.getMutationState().toMutations().next().getSecond();
+ Long timeStamp = indexRef.getTable().isTransactional() ? indexRef.getTimeStamp() : null;
+ List<Mutation> tableMetadata = connection.getMutationState().toMutations(timeStamp).next().getSecond();
connection.rollback();
MetaDataMutationResult result = connection.getQueryServices().updateIndexState(tableMetadata, dataTableName);
@@ -3003,9 +3164,9 @@ public class MetaDataClient {
}
private PTable addTableToCache(MetaDataMutationResult result) throws SQLException {
- addIndexesFromPhysicalTable(result);
+ addIndexesFromPhysicalTable(result, null);
PTable table = result.getTable();
- connection.addTable(table);
+ connection.addTable(table, TransactionUtil.getResolvedTime(connection, result));
return table;
}
@@ -3025,7 +3186,8 @@ public class MetaDataClient {
*/
boolean isSharedIndex = table.getViewIndexId() != null;
if (isSharedIndex) {
- return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp());
+ // we are assuming the stats table is not transactional
+ return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getCurrentScn());
}
boolean isView = table.getType() == PTableType.VIEW;
String physicalName = table.getPhysicalName().getString();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
index f015177..a3103bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema;
+import java.sql.SQLException;
+
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.query.MetaDataMutated;
@@ -28,7 +30,7 @@ public interface PMetaData extends MetaDataMutated, Iterable<PTable>, Cloneable
}
public int size();
public PMetaData clone();
- public PTable getTable(PTableKey key) throws TableNotFoundException;
+ public PTableRef getTableRef(PTableKey key) throws TableNotFoundException;
public PMetaData pruneTables(Pruner pruner);
public PFunction getFunction(PTableKey key) throws FunctionNotFoundException;
public PMetaData pruneFunctions(Pruner pruner);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index a15de13..9e4460d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -41,32 +41,12 @@ import com.google.common.primitives.Longs;
*
*/
public class PMetaDataImpl implements PMetaData {
- private static final class PTableRef {
- public final PTable table;
- public final int estSize;
- public volatile long lastAccessTime;
-
- public PTableRef(PTable table, long lastAccessTime, int estSize) {
- this.table = table;
- this.lastAccessTime = lastAccessTime;
- this.estSize = estSize;
- }
-
- public PTableRef(PTable table, long lastAccessTime) {
- this (table, lastAccessTime, table.getEstimatedSize());
- }
-
- public PTableRef(PTableRef tableRef) {
- this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize);
- }
- }
-
private static class PMetaDataCache implements Cloneable {
private static final int MIN_REMOVAL_SIZE = 3;
private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() {
@Override
public int compare(PTableRef tableRef1, PTableRef tableRef2) {
- return Longs.compare(tableRef1.lastAccessTime, tableRef2.lastAccessTime);
+ return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime());
}
};
private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
@@ -97,7 +77,7 @@ public class PMetaDataImpl implements PMetaData {
Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity));
// Copy value so that access time isn't changing anymore
for (PTableRef tableAccess : tables.values()) {
- newTables.put(tableAccess.table.getKey(), new PTableRef(tableAccess));
+ newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess));
}
return newTables;
}
@@ -133,7 +113,7 @@ public class PMetaDataImpl implements PMetaData {
if (tableAccess == null) {
return null;
}
- tableAccess.lastAccessTime = timeKeeper.getCurrentTime();
+ tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
return tableAccess;
}
@@ -157,37 +137,37 @@ public class PMetaDataImpl implements PMetaData {
// Add to new cache, but track references to remove when done
// to bring cache at least overage amount below it's max size.
for (PTableRef tableRef : this.tables.values()) {
- newCache.put(tableRef.table.getKey(), new PTableRef(tableRef));
+ newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef));
toRemove.add(tableRef);
- toRemoveBytes += tableRef.estSize;
- while (toRemoveBytes - toRemove.peekLast().estSize >= overage) {
+ toRemoveBytes += tableRef.getEstSize();
+ while (toRemoveBytes - toRemove.peekLast().getEstSize() >= overage) {
PTableRef removedRef = toRemove.removeLast();
- toRemoveBytes -= removedRef.estSize;
+ toRemoveBytes -= removedRef.getEstSize();
}
}
for (PTableRef toRemoveRef : toRemove) {
- newCache.remove(toRemoveRef.table.getKey());
+ newCache.remove(toRemoveRef.getTable().getKey());
}
return newCache;
}
private PTable put(PTableKey key, PTableRef ref) {
- currentByteSize += ref.estSize;
+ currentByteSize += ref.getEstSize();
PTableRef oldTableAccess = this.tables.put(key, ref);
PTable oldTable = null;
if (oldTableAccess != null) {
- currentByteSize -= oldTableAccess.estSize;
- oldTable = oldTableAccess.table;
+ currentByteSize -= oldTableAccess.getEstSize();
+ oldTable = oldTableAccess.getTable();
}
return oldTable;
}
- public PTable put(PTableKey key, PTable value) {
- return put(key, new PTableRef(value, timeKeeper.getCurrentTime()));
+ public PTable put(PTableKey key, PTable value, long resolvedTime) {
+ return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime));
}
- public PTable putDuplicate(PTableKey key, PTable value) {
- return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0));
+ public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) {
+ return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime));
}
public PTable remove(PTableKey key) {
@@ -195,8 +175,8 @@ public class PMetaDataImpl implements PMetaData {
if (value == null) {
return null;
}
- currentByteSize -= value.estSize;
- return value.table;
+ currentByteSize -= value.getEstSize();
+ return value.getTable();
}
public Iterator<PTable> iterator() {
@@ -210,7 +190,7 @@ public class PMetaDataImpl implements PMetaData {
@Override
public PTable next() {
- return iterator.next().table;
+ return iterator.next().getTable();
}
@Override
@@ -254,12 +234,12 @@ public class PMetaDataImpl implements PMetaData {
}
@Override
- public PTable getTable(PTableKey key) throws TableNotFoundException {
+ public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
PTableRef ref = metaData.get(key);
if (ref == null) {
throw new TableNotFoundException(key.getName());
}
- return ref.table;
+ return ref;
}
@Override
@@ -276,22 +256,29 @@ public class PMetaDataImpl implements PMetaData {
return metaData.size();
}
+ @Override
+ public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+ PMetaDataCache clone = metaData.clone();
+ clone.putDuplicate(table.getKey(), table, resolvedTimestamp);
+ return new PMetaDataImpl(clone);
+ }
@Override
- public PMetaData addTable(PTable table) throws SQLException {
+ public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
int netGain = 0;
PTableKey key = table.getKey();
PTableRef oldTableRef = metaData.get(key);
if (oldTableRef != null) {
- netGain -= oldTableRef.estSize;
+ netGain -= oldTableRef.getEstSize();
}
PTable newParentTable = null;
+ long parentResolvedTimestamp = resolvedTime;
if (table.getParentName() != null) { // Upsert new index table into parent data table list
String parentName = table.getParentName().getString();
PTableRef oldParentRef = metaData.get(new PTableKey(table.getTenantId(), parentName));
// If parentTable isn't cached, that's ok we can skip this
if (oldParentRef != null) {
- List<PTable> oldIndexes = oldParentRef.table.getIndexes();
+ List<PTable> oldIndexes = oldParentRef.getTable().getIndexes();
List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1);
newIndexes.addAll(oldIndexes);
for (int i = 0; i < newIndexes.size(); i++) {
@@ -302,8 +289,8 @@ public class PMetaDataImpl implements PMetaData {
}
}
newIndexes.add(table);
- netGain -= oldParentRef.estSize;
- newParentTable = PTableImpl.makePTable(oldParentRef.table, table.getTimeStamp(), newIndexes);
+ netGain -= oldParentRef.getEstSize();
+ newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes);
netGain += newParentTable.getEstimatedSize();
}
}
@@ -314,24 +301,24 @@ public class PMetaDataImpl implements PMetaData {
PMetaDataCache newMetaData = overage <= 0 ? metaData.clone() : metaData.cloneMinusOverage(overage);
if (newParentTable != null) { // Upsert new index table into parent data table list
- newMetaData.put(newParentTable.getKey(), newParentTable);
- newMetaData.putDuplicate(table.getKey(), table);
+ newMetaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp);
+ newMetaData.putDuplicate(table.getKey(), table, resolvedTime);
} else {
- newMetaData.put(table.getKey(), table);
+ newMetaData.put(table.getKey(), table, resolvedTime);
}
for (PTable index : table.getIndexes()) {
- newMetaData.putDuplicate(index.getKey(), index);
+ newMetaData.putDuplicate(index.getKey(), index, resolvedTime);
}
return new PMetaDataImpl(newMetaData);
}
@Override
- public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+ public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName));
if (oldTableRef == null) {
return this;
}
- List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.table);
+ List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.getTable());
List<PColumn> newColumns;
if (columnsToAdd.isEmpty()) {
newColumns = oldColumns;
@@ -340,8 +327,8 @@ public class PMetaDataImpl implements PMetaData {
newColumns.addAll(oldColumns);
newColumns.addAll(columnsToAdd);
}
- PTable newTable = PTableImpl.makePTable(oldTableRef.table, tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
- return addTable(newTable);
+ PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional);
+ return addTable(newTable, resolvedTime);
}
@Override
@@ -368,7 +355,7 @@ public class PMetaDataImpl implements PMetaData {
}
// also remove its reference from parent table
if (parentTableRef != null) {
- List<PTable> oldIndexes = parentTableRef.table.getIndexes();
+ List<PTable> oldIndexes = parentTableRef.getTable().getIndexes();
if(oldIndexes != null && !oldIndexes.isEmpty()) {
List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size());
newIndexes.addAll(oldIndexes);
@@ -377,13 +364,13 @@ public class PMetaDataImpl implements PMetaData {
if (index.getName().getString().equals(tableName)) {
newIndexes.remove(i);
PTable parentTable = PTableImpl.makePTable(
- parentTableRef.table,
- tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.table.getTimeStamp() : tableTimeStamp,
+ parentTableRef.getTable(),
+ tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp,
newIndexes);
if (tables == null) {
tables = metaData.clone();
}
- tables.put(parentTable.getKey(), parentTable);
+ tables.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp());
break;
}
}
@@ -393,12 +380,12 @@ public class PMetaDataImpl implements PMetaData {
}
@Override
- public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException {
+ public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException {
PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName));
if (tableRef == null) {
return this;
}
- PTable table = tableRef.table;
+ PTable table = tableRef.getTable();
PMetaDataCache tables = metaData.clone();
for (PColumn columnToRemove : columnsToRemove) {
PColumn column;
@@ -427,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns);
}
- tables.put(table.getKey(), table);
+ tables.put(table.getKey(), table, resolvedTime);
return new PMetaDataImpl(tables);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index a2979d4..ec97394 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -307,13 +307,14 @@ public interface PTable extends PMetaDataEntity {
PName getPhysicalName();
boolean isImmutableRows();
- void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
+ boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection);
PName getDefaultFamilyName();
boolean isWALDisabled();
boolean isMultiTenant();
boolean getStoreNulls();
+ boolean isTransactional();
ViewType getViewType();
String getViewStatement();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ff4b512..0827ea7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -77,6 +77,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.istack.NotNull;
+import co.cask.tephra.TxConstants;
+
/**
*
* Base class for PTable implementors. Provides abstraction for
@@ -123,6 +125,7 @@ public class PTableImpl implements PTable {
private boolean disableWAL;
private boolean multiTenant;
private boolean storeNulls;
+ private boolean isTransactional;
private ViewType viewType;
private Short viewIndexId;
private int estimatedSize;
@@ -202,7 +205,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -211,7 +214,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -220,7 +223,7 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -229,16 +232,16 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
- public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+ public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional);
}
public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -248,7 +251,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -258,7 +261,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), rowKeyOrderOptimizable);
+ table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional());
}
public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -268,28 +271,28 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
- boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable) throws SQLException {
+ boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
- indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable);
+ indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional);
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
- boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable)
+ boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional)
throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
- defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable);
+ defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
}
private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -297,10 +300,10 @@ public class PTableImpl implements PTable {
PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
- PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
+ PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
- viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
+ viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
}
@Override
@@ -328,7 +331,7 @@ public class PTableImpl implements PTable {
PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
+ IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
Preconditions.checkNotNull(schemaName);
Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -357,6 +360,7 @@ public class PTableImpl implements PTable {
this.viewType = viewType;
this.viewIndexId = viewIndexId;
this.indexType = indexType;
+ this.isTransactional = isTransactional;
this.tableStats = stats;
this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
List<PColumn> pkColumns;
@@ -676,7 +680,7 @@ public class PTableImpl implements PTable {
private Put setValues;
private Delete unsetValues;
- private Delete deleteRow;
+ private Mutation deleteRow;
private final long ts;
public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) {
@@ -713,7 +717,8 @@ public class PTableImpl implements PTable {
// way HBase works.
addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
SchemaUtil.getEmptyColumnFamilyPtr(PTableImpl.this),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, ByteUtil.EMPTY_BYTE_ARRAY_PTR));
+ QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
mutations.add(setValues);
if (!unsetValues.isEmpty()) {
mutations.add(unsetValues);
@@ -779,11 +784,22 @@ public class PTableImpl implements PTable {
@Override
public void delete() {
newMutations();
- Delete delete = new Delete(key);
- for (PColumnFamily colFamily : families) {
- delete.addFamily(colFamily.getName().getBytes(), ts);
+ // we're using the Tephra column family delete marker here to prevent the translation
+ // of deletes to puts by the Tephra's TransactionProcessor
+ if (PTableImpl.this.isTransactional()) {
+ Put delete = new Put(key);
+ for (PColumnFamily colFamily : families) {
+ delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+ HConstants.EMPTY_BYTE_ARRAY);
+ }
+ deleteRow = delete;
+ } else {
+ Delete delete = new Delete(key);
+ for (PColumnFamily colFamily : families) {
+ delete.deleteFamily(colFamily.getName().getBytes(), ts);
+ }
+ deleteRow = delete;
}
- deleteRow = delete;
// No need to write to the WAL for indexes
if (PTableImpl.this.getType() == PTableType.INDEX) {
deleteRow.setDurability(Durability.SKIP_WAL);
@@ -888,8 +904,8 @@ public class PTableImpl implements PTable {
}
@Override
- public synchronized void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
- if (indexMaintainersPtr == null) {
+ public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+ if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
indexMaintainersPtr = new ImmutableBytesWritable();
if (indexes.isEmpty()) {
indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
@@ -898,6 +914,7 @@ public class PTableImpl implements PTable {
}
}
ptr.set(indexMaintainersPtr.get(), indexMaintainersPtr.getOffset(), indexMaintainersPtr.getLength());
+ return indexMaintainersPtr.getLength() > 0;
}
@Override
@@ -1014,6 +1031,7 @@ public class PTableImpl implements PTable {
boolean disableWAL = table.getDisableWAL();
boolean multiTenant = table.getMultiTenant();
boolean storeNulls = table.getStoreNulls();
+ boolean isTransactional = table.getTransactional();
ViewType viewType = null;
String viewStatement = null;
List<PName> physicalNames = Collections.emptyList();
@@ -1044,7 +1062,7 @@ public class PTableImpl implements PTable {
result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
(bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
- multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
+ multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
return result;
} catch (SQLException e) {
throw new RuntimeException(e); // Impossible
@@ -1124,6 +1142,7 @@ public class PTableImpl implements PTable {
builder.setDisableWAL(table.isWALDisabled());
builder.setMultiTenant(table.isMultiTenant());
builder.setStoreNulls(table.getStoreNulls());
+ builder.setTransactional(table.isTransactional());
if(table.getType() == PTableType.VIEW){
builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()}));
builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement())));
@@ -1155,6 +1174,10 @@ public class PTableImpl implements PTable {
}
@Override
+ public boolean isTransactional() {
+ return isTransactional;
+ }
+
public int getBaseColumnCount() {
return baseColumnCount;
}