You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:40 UTC

[02/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 21548b0..cb9b831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -72,6 +72,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
@@ -169,6 +170,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -193,10 +195,13 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
@@ -230,8 +235,9 @@ public class MetaDataClient {
             VIEW_INDEX_ID + "," +
             INDEX_TYPE + "," +
             STORE_NULLS + "," +
-            BASE_COLUMN_COUNT +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            BASE_COLUMN_COUNT + "," +
+            TRANSACTIONAL +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String CREATE_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -372,7 +378,7 @@ public class MetaDataClient {
         MetaDataMutationResult result = updateCache(schemaName, tableName, true);
         return result.getMutationTime();
     }
-
+    
     /**
      * Update the cache with the latest as of the connection scn.
      * @param schemaName
@@ -391,6 +397,10 @@ public class MetaDataClient {
     public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException {
         return updateCache(tenantId, schemaName, tableName, false);
     }
+    
+    public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+        return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null);
+    }
 
     /**
      * Update the cache with the latest as of the connection scn.
@@ -415,24 +425,47 @@ public class MetaDataClient {
         long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
         return clientTimeStamp;
     }
-
+    
+    private long getCurrentScn() {
+    	Long scn = connection.getSCN();
+        long currentScn = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+        return currentScn;
+    }
+    
     private MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName,
-            boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez
-        long clientTimeStamp = getClientTimeStamp();
+            boolean alwaysHitServer, Long resolvedTimestamp) throws SQLException { // TODO: pass byte[] herez
         boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
         // System tables must always have a null tenantId
         tenantId = systemTable ? null : tenantId;
         PTable table = null;
+        PTableRef tableRef = null;
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         long tableTimestamp = HConstants.LATEST_TIMESTAMP;
+        long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
         try {
-            table = connection.getTable(new PTableKey(tenantId, fullTableName));
+            tableRef = connection.getTableRef(new PTableKey(tenantId, fullTableName));
+            table = tableRef.getTable();
             tableTimestamp = table.getTimeStamp();
+            tableResolvedTimestamp = tableRef.getResolvedTimeStamp();
         } catch (TableNotFoundException e) {
         }
-        // Don't bother with server call: we can't possibly find a newer table
-        if (table != null && !alwaysHitServer && (systemTable || tableTimestamp == clientTimeStamp - 1)) {
-            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,table);
+        
+        boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean(
+							                QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+							                QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+        // start a txn if all table are transactional by default or if we found the table in the cache and it is transactional
+        // TODO if system tables become transactional remove the check 
+        boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional());
+        if (!systemTable && isTransactional && !connection.getMutationState().isTransactionStarted()) {
+        	connection.getMutationState().startTransaction();
+        }
+        resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
+        // Do not make rpc to getTable if 
+        // 1. table is a system table
+        // 2. table was already resolved as of that timestamp
+		if (table != null && !alwaysHitServer
+				&& (systemTable || resolvedTimestamp == tableResolvedTimestamp)) {
+            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
         }
 
         int maxTryCount = tenantId == null ? 1 : 2;
@@ -442,7 +475,12 @@ public class MetaDataClient {
         do {
             final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
             final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
-            result = connection.getQueryServices().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimeStamp);
+            ConnectionQueryServices queryServices = connection.getQueryServices();
+			result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, resolvedTimestamp);
+			// if the table was assumed to be transactional, but is actually not transactional then re-resolve as of the right timestamp (and vice versa) 
+			if (table==null && result.getTable()!=null && result.getTable().isTransactional()!=isTransactional) {
+				result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, TransactionUtil.getResolvedTimestamp(connection, result.getTable().isTransactional(), HConstants.LATEST_TIMESTAMP));
+			}
 
             if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
                 return result;
@@ -466,15 +504,20 @@ public class MetaDataClient {
                 // timestamp, we can handle this such that we don't ask the
                 // server again.
                 if (table != null) {
+                    // Ensures that table in result is set to table found in our cache.
                     if (code == MutationCode.TABLE_ALREADY_EXISTS) {
-                    	// Ensures that table in result is set to table found in our cache.
-                        result.setTable(table);
-                        // Although this table is up-to-date, the parent table may not be.
-                        // In this case, we update the parent table which may in turn pull
-                        // in indexes to add to this table.
-                        if (addIndexesFromPhysicalTable(result)) {
-                            connection.addTable(result.getTable());
-                        }
+                    	result.setTable(table);
+                    	// Although this table is up-to-date, the parent table may not be.
+                    	// In this case, we update the parent table which may in turn pull
+                    	// in indexes to add to this table.
+                    	long resolvedTime = TransactionUtil.getResolvedTime(connection, result);
+						if (addIndexesFromPhysicalTable(result, resolvedTimestamp)) {
+                    	    connection.addTable(result.getTable(), resolvedTime);
+                    	}
+                    	else {
+                        	// if we aren't adding the table, we still need to update the resolved time of the table 
+                        	connection.updateResolvedTimestamp(table, resolvedTime);
+                    	}
                         return result;
                     }
                     // If table was not found at the current time stamp and we have one cached, remove it.
@@ -566,10 +609,11 @@ public class MetaDataClient {
      * of the table for which we just updated.
      * TODO: combine this round trip with the one that updates the cache for the child table.
      * @param result the result from updating the cache for the current table.
+     * @param resolvedTimestamp timestamp at which child table was resolved
      * @return true if the PTable contained by result was modified and false otherwise
      * @throws SQLException if the physical table cannot be found
      */
-    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException {
+    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result, Long resolvedTimestamp) throws SQLException {
         PTable table = result.getTable();
         // If not a view or if a view directly over an HBase table, there's nothing to do
         if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) {
@@ -578,7 +622,7 @@ public class MetaDataClient {
         String physicalName = table.getPhysicalName().getString();
         String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
         String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
-        MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false);
+        MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false, resolvedTimestamp);
         PTable physicalTable = parentResult.getTable();
         if (physicalTable == null) {
             throw new TableNotFoundException(schemaName, tableName);
@@ -782,7 +826,7 @@ public class MetaDataClient {
 
     public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
         PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null);
-        if (table == null || table.getType() == PTableType.VIEW) {
+        if (table == null || table.getType() == PTableType.VIEW || table.isTransactional()) {
             return new MutationState(0,connection);
         }
         // Hack to get around the case when an SCN is specified on the connection.
@@ -1137,7 +1181,7 @@ public class MetaDataClient {
                     if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
                     }
-                    if (connection.getQueryServices().hasInvalidIndexConfiguration()) {
+                    if (!connection.getQueryServices().isMutableIndexWALCodecInstalled() && !dataTable.isTransactional()) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
                     }
                 }
@@ -1268,19 +1312,19 @@ public class MetaDataClient {
                 // as there's no need to burn another sequence value.
                 if (allocateIndexId && indexId == null) {
                     Long scn = connection.getSCN();
-                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                     PName tenantId = connection.getTenantId();
                     String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
                     PName physicalName = dataTable.getPhysicalName();
                     int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
                     SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets);
-                    // Create at parent timestamp as we know that will be earlier than now
-                    // and earlier than any SCN if one is set.
+                    // if scn is set create at scn-1, so we can see the sequence or else use latest timestamp (so that latest server time is used)
+                    long sequenceTimestamp = scn!=null ? scn-1 : HConstants.LATEST_TIMESTAMP;
                     createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
                         true, Short.MIN_VALUE, 1, 1, false, Long.MIN_VALUE, Long.MAX_VALUE,
-                        dataTable.getTimeStamp());
+                        sequenceTimestamp);
                     long[] seqValues = new long[1];
                     SQLException[] sqlExceptions = new SQLException[1];
+                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                     connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
                             Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions);
                     if (sqlExceptions[0] != null) {
@@ -1397,7 +1441,7 @@ public class MetaDataClient {
             functionUpsert.setString(5, function.getJarPath());
             functionUpsert.setString(6, function.getReturnType());
             functionUpsert.execute();
-            functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            functionData.addAll(connection.getMutationState().toMutations(null).next().getSecond());
             connection.rollback();
             MetaDataMutationResult result = connection.getQueryServices().createFunction(functionData, function, stmt.isTemporary());
             MutationCode code = result.getMutationCode();
@@ -1498,29 +1542,36 @@ public class MetaDataClient {
             long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
             boolean multiTenant = false;
             boolean storeNulls = false;
+            boolean transactional = false;
             Integer saltBucketNum = null;
             String defaultFamilyName = null;
             boolean isImmutableRows = false;
             List<PName> physicalNames = Collections.emptyList();
             boolean addSaltColumn = false;
             boolean rowKeyOrderOptimizable = true;
+            Long timestamp = null;
             if (parent != null && tableType == PTableType.INDEX) {
-                // Index on view
-                // TODO: Can we support a multi-tenant index directly on a multi-tenant
-                // table instead of only a view? We don't have anywhere to put the link
-                // from the table to the index, though.
-                if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
-                    PName physicalName = parent.getPhysicalName();
-                    saltBucketNum = parent.getBucketNum();
-                    addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
-                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
-                    if (indexType == IndexType.LOCAL) {
-                        saltBucketNum = null;
-                        // Set physical name of local index table
-                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
-                    } else {
-                        // Set physical name of view index table
-                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+            	transactional = parent.isTransactional();
+                timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
+                storeNulls = parent.getStoreNulls();
+                if (tableType == PTableType.INDEX) {
+	                // Index on view
+	                // TODO: Can we support a multi-tenant index directly on a multi-tenant
+	                // table instead of only a view? We don't have anywhere to put the link
+	                // from the table to the index, though.
+	                if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
+	                	PName physicalName = parent.getPhysicalName();
+	                    saltBucketNum = parent.getBucketNum();
+	                    addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
+	                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
+	                    if (indexType == IndexType.LOCAL) {
+	                        saltBucketNum = null;
+	                        // Set physical name of local index table
+	                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
+	                    } else {
+	                        // Set physical name of view index table
+	                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+	                    }
                     }
                 }
 
@@ -1536,7 +1587,7 @@ public class MetaDataClient {
                 incrementStatement.execute();
                 // Get list of mutations and add to table meta data that will be passed to server
                 // to guarantee order. This row will always end up last
-                tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                 connection.rollback();
 
                 // Add row linking from data table row to index table row
@@ -1561,12 +1612,10 @@ public class MetaDataClient {
             }
 
             Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
-            Map<String,Object> commonFamilyProps = Collections.emptyMap();
+            Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
             // Somewhat hacky way of determining if property is for HColumnDescriptor or HTableDescriptor
             HColumnDescriptor defaultDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
             if (!statement.getProps().isEmpty()) {
-                commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
-
                 Collection<Pair<String,Object>> props = statement.getProps().get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
                 for (Pair<String,Object> prop : props) {
                     if (defaultDescriptor.getValue(prop.getFirst()) == null) {
@@ -1580,7 +1629,7 @@ public class MetaDataClient {
             // Although unusual, it's possible to set a mapped VIEW as having immutable rows.
             // This tells Phoenix that you're managing the index maintenance yourself.
             if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
-                Boolean isImmutableRowsProp = (Boolean) tableProps.remove(PTable.IS_IMMUTABLE_ROWS_PROP_NAME);
+                Boolean isImmutableRowsProp = (Boolean) tableProps.get(PTable.IS_IMMUTABLE_ROWS_PROP_NAME);
                 if (isImmutableRowsProp == null) {
                     isImmutableRows = connection.getQueryServices().getProps().getBoolean(QueryServices.IMMUTABLE_ROWS_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_ROWS);
                 } else {
@@ -1590,7 +1639,7 @@ public class MetaDataClient {
 
             // Can't set any of these on views or shared indexes on views
             if (tableType != PTableType.VIEW && indexId == null) {
-                saltBucketNum = (Integer) tableProps.remove(PhoenixDatabaseMetaData.SALT_BUCKETS);
+                saltBucketNum = (Integer) tableProps.get(PhoenixDatabaseMetaData.SALT_BUCKETS);
                 if (saltBucketNum != null) {
                     if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_BUCKET_NUM).build().buildException();
@@ -1607,42 +1656,110 @@ public class MetaDataClient {
                 addSaltColumn = (saltBucketNum != null);
             }
 
-            boolean removedProp = false;
             // Can't set MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an INDEX or a non mapped VIEW
             if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
-                Boolean multiTenantProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.MULTI_TENANT);
+                Boolean multiTenantProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.MULTI_TENANT);
                 multiTenant = Boolean.TRUE.equals(multiTenantProp);
-                defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
-                removedProp = (defaultFamilyName != null);
+                defaultFamilyName = (String)tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
             }
 
             boolean disableWAL = false;
-            Boolean disableWALProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.DISABLE_WAL);
+            Boolean disableWALProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.DISABLE_WAL);
             if (disableWALProp != null) {
                 disableWAL = disableWALProp;
             }
 
-            Boolean storeNullsProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.STORE_NULLS);
-            storeNulls = storeNullsProp == null
-                    ? connection.getQueryServices().getProps().getBoolean(
-                            QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
-                            QueryServicesOptions.DEFAULT_STORE_NULLS)
-                    : storeNullsProp;
+            Boolean storeNullsProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.STORE_NULLS);
+            if (storeNullsProp == null) {
+                if (parent == null) {
+                    storeNulls = connection.getQueryServices().getProps().getBoolean(
+                                    QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
+                                    QueryServicesOptions.DEFAULT_STORE_NULLS);
+                    tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
+                }
+            } else {
+                storeNulls = storeNullsProp;
+            }
+            Boolean transactionalProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.TRANSACTIONAL);
+            if (transactionalProp != null && parent != null) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL)
+                .setSchemaName(schemaName).setTableName(tableName)
+                .build().buildException();
+            }
+            if (parent == null) {
+                if (transactionalProp == null) {
+                    transactional = connection.getQueryServices().getProps().getBoolean(
+                                    QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+                                    QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+                } else {
+                    transactional = transactionalProp;
+                }
+            }
+            tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactional);
+            if (transactional) {
+                // If TTL set, use Tephra TTL property name instead
+                Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL);
+                if (ttl != null) {
+                    commonFamilyProps.put(TxConstants.PROPERTY_TTL, ttl);
+                }
+            }
+            
+            boolean sharedTable = statement.getTableType() == PTableType.VIEW || indexId != null;
+            if (transactional) { 
+                // Tephra uses an empty value cell as its delete marker, so we need to turn on
+                // storeNulls for transactional tables.
+                // If we use regular column delete markers (which is what non transactional tables
+                // use), then they get converted
+                // on the server, but this can mess up our secondary index code as the changes get
+                // committed prior to the
+                // maintenance code being able to see the prior state to update the rows correctly.
+            	if (Boolean.FALSE.equals(storeNullsProp)) {
+            		throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
+            		.setSchemaName(schemaName).setTableName(tableName)
+            		.build().buildException();
+            	}
+            	// Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes
+            	storeNulls = true;
+            	tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
+            	
+            	if (!sharedTable) {
+                    Integer maxVersionsProp = (Integer) commonFamilyProps.get(HConstants.VERSIONS);
+                    if (maxVersionsProp == null) {
+                        if (parent != null) {
+                            HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(parent.getPhysicalName().getBytes());
+                            if (desc != null) {
+                                maxVersionsProp = desc.getFamily(SchemaUtil.getEmptyColumnFamily(parent)).getMaxVersions();
+                            }
+                        }
+                        if (maxVersionsProp == null) {
+                            maxVersionsProp = connection.getQueryServices().getProps().getInt(
+                                    QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
+                                    QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
+                        }
+                        commonFamilyProps.put(HConstants.VERSIONS, maxVersionsProp);
+                    }
+            	}
+            }
+            timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp;
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
-            if ((statement.getTableType() == PTableType.VIEW || indexId != null) && !tableProps.isEmpty()) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
-                        .buildException();
-            }
-            if (removedProp) {
-                tableProps.put(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME, defaultFamilyName);
+            if (sharedTable) {
+                if (tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME) != null) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE)
+                    .setSchemaName(schemaName).setTableName(tableName)
+                    .build().buildException();
+                }
+                if (SchemaUtil.hasHTableDescriptorProps(tableProps)) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
+                            .buildException();
+                }
             }
 
             List<ColumnDef> colDefs = statement.getColumnDefs();
             List<PColumn> columns;
             LinkedHashSet<PColumn> pkColumns;
 
-            if (tenantId != null && (tableType != PTableType.VIEW && indexId == null)) {
+            if (tenantId != null && !sharedTable) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE)
                     .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             }
@@ -1866,8 +1983,8 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true);
-                connection.addTable(table);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false);
+                connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
                     int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size();
@@ -1922,7 +2039,7 @@ public class MetaDataClient {
                 addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName, pkName, keySeq, saltBucketNum != null);
             }
 
-            tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
@@ -1968,9 +2085,10 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setInt(20, BASE_TABLE_BASE_COLUMN_COUNT);
             }
+            tableUpsert.setBoolean(21, transactional);
             tableUpsert.execute();
 
-            tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             /*
@@ -1996,7 +2114,9 @@ public class MetaDataClient {
             MutationCode code = result.getMutationCode();
             switch(code) {
             case TABLE_ALREADY_EXISTS:
-                addTableToCache(result);
+                if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table
+                    addTableToCache(result);
+                }
                 if (!statement.ifNotExists()) {
                     throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
                 }
@@ -2016,11 +2136,11 @@ public class MetaDataClient {
             default:
                 PName newSchemaName = PNameFactory.newName(schemaName);
                 PTable table =  PTableImpl.makePTable(
-                        tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(),
+                        tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(),
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType, rowKeyOrderOptimizable);
+                        indexId, indexType, rowKeyOrderOptimizable, transactional);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2175,6 +2295,8 @@ public class MetaDataClient {
 
             MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
             MutationCode code = result.getMutationCode();
+            PTable table = result.getTable();
+			boolean transactional = table!=null && table.isTransactional();
             switch (code) {
             case TABLE_NOT_FOUND:
                 if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); }
@@ -2186,12 +2308,10 @@ public class MetaDataClient {
 
                 .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
-                connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
-                        result.getMutationTime());
+				connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime());
 
                 if (result.getTable() != null && tableType != PTableType.VIEW) {
                     connection.setAutoCommit(true);
-                    PTable table = result.getTable();
                     boolean dropMetaData = result.getTable().getViewIndexId() == null &&
                             connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
                     long ts = (scn == null ? result.getMutationTime() : scn);
@@ -2334,12 +2454,12 @@ public class MetaDataClient {
         return mutationCode;
     }
 
-    private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta) throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, null, null, null, null);
+    private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional) throws SQLException {
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, null, null, null);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
-            Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
+            Boolean isTransactional, Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -2371,6 +2491,9 @@ public class MetaDataClient {
         if (storeNulls != null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
         }
+        if (isTransactional != null) {
+            mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
+        }
         return seqNum;
     }
 
@@ -2404,10 +2527,13 @@ public class MetaDataClient {
             Boolean multiTenantProp = null;
             Boolean disableWALProp = null;
             Boolean storeNullsProp = null;
+            Boolean isTransactionalProp = null;
 
             ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps();
             Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
-            PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable();
+            TableRef tableRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
+			PTable table = tableRef.getTable();
+            Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
             List<ColumnDef> columnDefs = statement.getColumnDefs();
             if (columnDefs == null) {
                 columnDefs = Collections.emptyList();
@@ -2426,6 +2552,8 @@ public class MetaDataClient {
                             disableWALProp = (Boolean)prop.getSecond();
                         } else if (propName.equals(STORE_NULLS)) {
                             storeNullsProp = (Boolean)prop.getSecond();
+                        } else if (propName.equals(TRANSACTIONAL)) {
+                            isTransactionalProp = (Boolean)prop.getSecond();
                         }
                     } 
                 }
@@ -2433,6 +2561,7 @@ public class MetaDataClient {
             }
             boolean retried = false;
             boolean changingPhoenixTableProperty = false;
+            boolean nonTxToTx = false;
             while (true) {
                 ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
                 table = resolver.getTables().get(0).getTable();
@@ -2487,6 +2616,23 @@ public class MetaDataClient {
                         changingPhoenixTableProperty = true;
                     }
                 }
+                Boolean isTransactional = null;
+                if (isTransactionalProp != null) {
+                    if (isTransactionalProp.booleanValue() != table.isTransactional()) {
+                        isTransactional = isTransactionalProp;
+                        // We can only go one way: from non transactional to transactional
+                        // Going the other way would require rewriting the cell timestamps
+                        // and doing a major compaction to get rid of any Tephra specific
+                        // delete markers.
+                        if (!isTransactional) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                        }
+                        timeStamp = TransactionUtil.getTableTimestamp(connection, isTransactional);
+                        changingPhoenixTableProperty = true;
+                        nonTxToTx = true;
+                    }
+                }
 
                 int numPkColumnsAdded = 0;
                 PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE);
@@ -2558,7 +2704,7 @@ public class MetaDataClient {
                         }
                     }
 
-                    columnMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 } else {
                     // Check that HBase configured properly for mutable secondary indexing
@@ -2567,10 +2713,12 @@ public class MetaDataClient {
                     if (Boolean.FALSE.equals(isImmutableRows) && !table.getIndexes().isEmpty()) {
                         int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
                         if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
-                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES)
+                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                         }
-                        if (connection.getQueryServices().hasInvalidIndexConfiguration()) {
-                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                        if (connection.getQueryServices().isMutableIndexWALCodecInstalled() && !table.isTransactional()) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG)
+                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                         }
                     }
                     if (Boolean.TRUE.equals(multiTenant)) {
@@ -2578,17 +2726,17 @@ public class MetaDataClient {
                     }
                 }
 
-                if (numPkColumnsAdded>0 && !table.getIndexes().isEmpty()) {
+                if (!table.getIndexes().isEmpty() && (numPkColumnsAdded>0 || nonTxToTx)) {
                     for (PTable index : table.getIndexes()) {
-                        incrementTableSeqNum(index, index.getType(), numPkColumnsAdded);
+                        incrementTableSeqNum(index, index.getType(), numPkColumnsAdded, nonTxToTx ? Boolean.TRUE : null);
                     }
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 long seqNum = table.getSequenceNumber();
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) { 
-                    seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isImmutableRows, disableWAL, multiTenant, storeNulls);
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isTransactional, isImmutableRows, disableWAL, multiTenant, storeNulls);
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 
@@ -2626,10 +2774,22 @@ public class MetaDataClient {
                         return new MutationState(0,connection);
                     }
 
-                    // Only update client side cache if we aren't adding a PK column to a table with indexes.
+                    // Only update client side cache if we aren't adding a PK column to a table with indexes or 
+                    // transitioning a table from non transactional to transactional.
                     // We could update the cache manually then too, it'd just be a pain.
-                    if (numPkColumnsAdded==0 || table.getIndexes().isEmpty()) {
-                        connection.addColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, disableWAL == null ? table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() : multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls);
+                    if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && !nonTxToTx)) {
+						connection.addColumn(
+								tenantId,
+								SchemaUtil.getTableName(schemaName, tableName),
+								columns,
+								result.getMutationTime(),
+								seqNum,
+								isImmutableRows == null ? table.isImmutableRows() : isImmutableRows,
+								disableWAL == null ? table.isWALDisabled() : disableWAL,
+								multiTenant == null ? table.isMultiTenant() : multiTenant,
+								storeNulls == null ? table.getStoreNulls() : storeNulls, 
+								isTransactional == null ? table.isTransactional() : isTransactional,
+								TransactionUtil.getResolvedTime(connection, result));
                     }
                     // Delete rows in view index if we haven't dropped it already
                     // We only need to do this if the multiTenant transitioned to false
@@ -2760,12 +2920,12 @@ public class MetaDataClient {
             boolean retried = false;
             while (true) {
                 final ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
-                PTable table = resolver.getTables().get(0).getTable();
+                TableRef tableRef = resolver.getTables().get(0);
+				PTable table = tableRef.getTable();
                 List<ColumnName> columnRefs = statement.getColumnRefs();
                 if(columnRefs == null) {
                     columnRefs = Lists.newArrayListWithCapacity(0);
                 }
-                TableRef tableRef = null;
                 List<ColumnRef> columnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size() + table.getIndexes().size());
                 List<TableRef> indexesToDrop = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
                 List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + table.getColumns().size() - columnRefs.size()));
@@ -2781,14 +2941,13 @@ public class MetaDataClient {
                         }
                         throw e;
                     }
-                    tableRef = columnRef.getTableRef();
                     PColumn columnToDrop = columnRef.getColumn();
                     tableColumnsToDrop.add(columnToDrop);
                     if (SchemaUtil.isPKColumn(columnToDrop)) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_DROP_PK)
                             .setColumnName(columnToDrop.getName().getString()).build().buildException();
                     }
-                    columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition()));
+                    columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition()));
                 }
 
                 dropColumnMutations(table, tableColumnsToDrop, tableMetaData);
@@ -2810,16 +2969,17 @@ public class MetaDataClient {
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {
-                        incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size());
+                        incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null);
                         dropColumnMutations(index, indexColumnsToDrop, tableMetaData);
                     }
 
                 }
-                tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
+                tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
-                long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size());
-                tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null);
+                tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
                 Collections.reverse(tableMetaData);
@@ -2870,7 +3030,7 @@ public class MetaDataClient {
                     // client-side cache as it would be too painful. Just let it pull it over from
                     // the server when needed.
                     if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) {
-                        connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum);
+                        connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
                     }
                     // If we have a VIEW, then only delete the metadata, and leave the table data alone
                     if (table.getType() != PTableType.VIEW) {
@@ -2955,7 +3115,8 @@ public class MetaDataClient {
                     tableUpsert.close();
                 }
             }
-            List<Mutation> tableMetadata = connection.getMutationState().toMutations().next().getSecond();
+            Long timeStamp = indexRef.getTable().isTransactional() ? indexRef.getTimeStamp() : null;
+			List<Mutation> tableMetadata = connection.getMutationState().toMutations(timeStamp).next().getSecond();
             connection.rollback();
 
             MetaDataMutationResult result = connection.getQueryServices().updateIndexState(tableMetadata, dataTableName);
@@ -3003,9 +3164,9 @@ public class MetaDataClient {
     }
 
     private PTable addTableToCache(MetaDataMutationResult result) throws SQLException {
-        addIndexesFromPhysicalTable(result);
+        addIndexesFromPhysicalTable(result, null);
         PTable table = result.getTable();
-        connection.addTable(table);
+        connection.addTable(table, TransactionUtil.getResolvedTime(connection, result));
         return table;
     }
 
@@ -3025,7 +3186,8 @@ public class MetaDataClient {
          */
         boolean isSharedIndex = table.getViewIndexId() != null;
         if (isSharedIndex) {
-            return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp());
+        	// we are assuming the stats table is not transactional
+            return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getCurrentScn());
         }
         boolean isView = table.getType() == PTableType.VIEW;
         String physicalName = table.getPhysicalName().getString();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
index f015177..a3103bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema;
 
+import java.sql.SQLException;
+
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.query.MetaDataMutated;
 
@@ -28,7 +30,7 @@ public interface PMetaData extends MetaDataMutated, Iterable<PTable>, Cloneable
     }
     public int size();
     public PMetaData clone();
-    public PTable getTable(PTableKey key) throws TableNotFoundException;
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException;
     public PMetaData pruneTables(Pruner pruner);
     public PFunction getFunction(PTableKey key) throws FunctionNotFoundException;
     public PMetaData pruneFunctions(Pruner pruner);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index a15de13..9e4460d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -41,32 +41,12 @@ import com.google.common.primitives.Longs;
  *
  */
 public class PMetaDataImpl implements PMetaData {
-        private static final class PTableRef {
-            public final PTable table;
-            public final int estSize;
-            public volatile long lastAccessTime;
-            
-            public PTableRef(PTable table, long lastAccessTime, int estSize) {
-                this.table = table;
-                this.lastAccessTime = lastAccessTime;
-                this.estSize = estSize;
-            }
-
-            public PTableRef(PTable table, long lastAccessTime) {
-                this (table, lastAccessTime, table.getEstimatedSize());
-            }
-
-            public PTableRef(PTableRef tableRef) {
-                this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize);
-            }
-        }
-
         private static class PMetaDataCache implements Cloneable {
             private static final int MIN_REMOVAL_SIZE = 3;
             private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() {
                 @Override
                 public int compare(PTableRef tableRef1, PTableRef tableRef2) {
-                    return Longs.compare(tableRef1.lastAccessTime, tableRef2.lastAccessTime);
+                    return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime());
                 }
             };
             private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
@@ -97,7 +77,7 @@ public class PMetaDataImpl implements PMetaData {
                 Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity));
                 // Copy value so that access time isn't changing anymore
                 for (PTableRef tableAccess : tables.values()) {
-                    newTables.put(tableAccess.table.getKey(), new PTableRef(tableAccess));
+                    newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess));
                 }
                 return newTables;
             }
@@ -133,7 +113,7 @@ public class PMetaDataImpl implements PMetaData {
                 if (tableAccess == null) {
                     return null;
                 }
-                tableAccess.lastAccessTime = timeKeeper.getCurrentTime();
+                tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
                 return tableAccess;
             }
             
@@ -157,37 +137,37 @@ public class PMetaDataImpl implements PMetaData {
                 // Add to new cache, but track references to remove when done
                 // to bring cache at least overage amount below it's max size.
                 for (PTableRef tableRef : this.tables.values()) {
-                    newCache.put(tableRef.table.getKey(), new PTableRef(tableRef));
+                    newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef));
                     toRemove.add(tableRef);
-                    toRemoveBytes += tableRef.estSize;
-                    while (toRemoveBytes - toRemove.peekLast().estSize >= overage) {
+                    toRemoveBytes += tableRef.getEstSize();
+                    while (toRemoveBytes - toRemove.peekLast().getEstSize() >= overage) {
                         PTableRef removedRef = toRemove.removeLast();
-                        toRemoveBytes -= removedRef.estSize;
+                        toRemoveBytes -= removedRef.getEstSize();
                     }
                 }
                 for (PTableRef toRemoveRef : toRemove) {
-                    newCache.remove(toRemoveRef.table.getKey());
+                    newCache.remove(toRemoveRef.getTable().getKey());
                 }
                 return newCache;
             }
 
             private PTable put(PTableKey key, PTableRef ref) {
-                currentByteSize += ref.estSize;
+                currentByteSize += ref.getEstSize();
                 PTableRef oldTableAccess = this.tables.put(key, ref);
                 PTable oldTable = null;
                 if (oldTableAccess != null) {
-                    currentByteSize -= oldTableAccess.estSize;
-                    oldTable = oldTableAccess.table;
+                    currentByteSize -= oldTableAccess.getEstSize();
+                    oldTable = oldTableAccess.getTable();
                 }
                 return oldTable;
             }
 
-            public PTable put(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime()));
+            public PTable put(PTableKey key, PTable value, long resolvedTime) {
+                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime));
             }
             
-            public PTable putDuplicate(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0));
+            public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) {
+                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime));
             }
             
             public PTable remove(PTableKey key) {
@@ -195,8 +175,8 @@ public class PMetaDataImpl implements PMetaData {
                 if (value == null) {
                     return null;
                 }
-                currentByteSize -= value.estSize;
-                return value.table;
+                currentByteSize -= value.getEstSize();
+                return value.getTable();
             }
             
             public Iterator<PTable> iterator() {
@@ -210,7 +190,7 @@ public class PMetaDataImpl implements PMetaData {
 
                     @Override
                     public PTable next() {
-                        return iterator.next().table;
+                        return iterator.next().getTable();
                     }
 
                     @Override
@@ -254,12 +234,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PTable getTable(PTableKey key) throws TableNotFoundException {
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
         PTableRef ref = metaData.get(key);
         if (ref == null) {
             throw new TableNotFoundException(key.getName());
         }
-        return ref.table;
+        return ref;
     }
 
     @Override
@@ -276,22 +256,29 @@ public class PMetaDataImpl implements PMetaData {
         return metaData.size();
     }
 
+    @Override
+    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+    	PMetaDataCache clone = metaData.clone();
+    	clone.putDuplicate(table.getKey(), table, resolvedTimestamp);
+    	return new PMetaDataImpl(clone);
+    }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
+    public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
         int netGain = 0;
         PTableKey key = table.getKey();
         PTableRef oldTableRef = metaData.get(key);
         if (oldTableRef != null) {
-            netGain -= oldTableRef.estSize;
+            netGain -= oldTableRef.getEstSize();
         }
         PTable newParentTable = null;
+        long parentResolvedTimestamp = resolvedTime;
         if (table.getParentName() != null) { // Upsert new index table into parent data table list
             String parentName = table.getParentName().getString();
             PTableRef oldParentRef = metaData.get(new PTableKey(table.getTenantId(), parentName));
             // If parentTable isn't cached, that's ok we can skip this
             if (oldParentRef != null) {
-                List<PTable> oldIndexes = oldParentRef.table.getIndexes();
+                List<PTable> oldIndexes = oldParentRef.getTable().getIndexes();
                 List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1);
                 newIndexes.addAll(oldIndexes);
                 for (int i = 0; i < newIndexes.size(); i++) {
@@ -302,8 +289,8 @@ public class PMetaDataImpl implements PMetaData {
                     }
                 }
                 newIndexes.add(table);
-                netGain -= oldParentRef.estSize;
-                newParentTable = PTableImpl.makePTable(oldParentRef.table, table.getTimeStamp(), newIndexes);
+                netGain -= oldParentRef.getEstSize();
+                newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes);
                 netGain += newParentTable.getEstimatedSize();
             }
         }
@@ -314,24 +301,24 @@ public class PMetaDataImpl implements PMetaData {
         PMetaDataCache newMetaData = overage <= 0 ? metaData.clone() : metaData.cloneMinusOverage(overage);
         
         if (newParentTable != null) { // Upsert new index table into parent data table list
-            newMetaData.put(newParentTable.getKey(), newParentTable);
-            newMetaData.putDuplicate(table.getKey(), table);
+            newMetaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp);
+            newMetaData.putDuplicate(table.getKey(), table, resolvedTime);
         } else {
-            newMetaData.put(table.getKey(), table);
+            newMetaData.put(table.getKey(), table, resolvedTime);
         }
         for (PTable index : table.getIndexes()) {
-            newMetaData.putDuplicate(index.getKey(), index);
+            newMetaData.putDuplicate(index.getKey(), index, resolvedTime);
         }
         return new PMetaDataImpl(newMetaData);
     }
 
     @Override
-    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
         PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName));
         if (oldTableRef == null) {
             return this;
         }
-        List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.table);
+        List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.getTable());
         List<PColumn> newColumns;
         if (columnsToAdd.isEmpty()) {
             newColumns = oldColumns;
@@ -340,8 +327,8 @@ public class PMetaDataImpl implements PMetaData {
             newColumns.addAll(oldColumns);
             newColumns.addAll(columnsToAdd);
         }
-        PTable newTable = PTableImpl.makePTable(oldTableRef.table, tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
-        return addTable(newTable);
+        PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional);
+        return addTable(newTable, resolvedTime);
     }
 
     @Override
@@ -368,7 +355,7 @@ public class PMetaDataImpl implements PMetaData {
         }
         // also remove its reference from parent table
         if (parentTableRef != null) {
-            List<PTable> oldIndexes = parentTableRef.table.getIndexes();
+            List<PTable> oldIndexes = parentTableRef.getTable().getIndexes();
             if(oldIndexes != null && !oldIndexes.isEmpty()) {
                 List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size());
                 newIndexes.addAll(oldIndexes);
@@ -377,13 +364,13 @@ public class PMetaDataImpl implements PMetaData {
                     if (index.getName().getString().equals(tableName)) {
                         newIndexes.remove(i);
                         PTable parentTable = PTableImpl.makePTable(
-                                parentTableRef.table,
-                                tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.table.getTimeStamp() : tableTimeStamp,
+                                parentTableRef.getTable(),
+                                tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp,
                                 newIndexes);
                         if (tables == null) { 
                             tables = metaData.clone();
                         }
-                        tables.put(parentTable.getKey(), parentTable);
+                        tables.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp());
                         break;
                     }
                 }
@@ -393,12 +380,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException {
+    public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException {
         PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName));
         if (tableRef == null) {
             return this;
         }
-        PTable table = tableRef.table;
+        PTable table = tableRef.getTable();
         PMetaDataCache tables = metaData.clone();
         for (PColumn columnToRemove : columnsToRemove) {
             PColumn column;
@@ -427,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
             
             table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns);
         }
-        tables.put(table.getKey(), table);
+        tables.put(table.getKey(), table, resolvedTime);
         return new PMetaDataImpl(tables);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index a2979d4..ec97394 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -307,13 +307,14 @@ public interface PTable extends PMetaDataEntity {
     PName getPhysicalName();
     boolean isImmutableRows();
 
-    void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
+    boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
     IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection);
     PName getDefaultFamilyName();
 
     boolean isWALDisabled();
     boolean isMultiTenant();
     boolean getStoreNulls();
+    boolean isTransactional();
 
     ViewType getViewType();
     String getViewStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ff4b512..0827ea7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -77,6 +77,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
 
+import co.cask.tephra.TxConstants;
+
 /**
  *
  * Base class for PTable implementors.  Provides abstraction for
@@ -123,6 +125,7 @@ public class PTableImpl implements PTable {
     private boolean disableWAL;
     private boolean multiTenant;
     private boolean storeNulls;
+    private boolean isTransactional;
     private ViewType viewType;
     private Short viewIndexId;
     private int estimatedSize;
@@ -202,7 +205,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -211,7 +214,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -220,7 +223,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -229,16 +232,16 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
     
-    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional);
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -248,7 +251,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -258,7 +261,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), rowKeyOrderOptimizable);
+                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional());
     }
 
     public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -268,28 +271,28 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
             PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
-            boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable) throws SQLException {
+            boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable);
+                indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
             PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
             List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
-            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable)
+            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
-                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable);
+                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -297,10 +300,10 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
+            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
-                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
+                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
     }
 
     @Override
@@ -328,7 +331,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
+            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -357,6 +360,7 @@ public class PTableImpl implements PTable {
         this.viewType = viewType;
         this.viewIndexId = viewIndexId;
         this.indexType = indexType;
+        this.isTransactional = isTransactional;
         this.tableStats = stats;
         this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         List<PColumn> pkColumns;
@@ -676,7 +680,7 @@ public class PTableImpl implements PTable {
 
         private Put setValues;
         private Delete unsetValues;
-        private Delete deleteRow;
+        private Mutation deleteRow;
         private final long ts;
 
         public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) {
@@ -713,7 +717,8 @@ public class PTableImpl implements PTable {
                 // way HBase works.
                 addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
                     SchemaUtil.getEmptyColumnFamilyPtr(PTableImpl.this),
-                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, ByteUtil.EMPTY_BYTE_ARRAY_PTR));
+                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                    QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
                 mutations.add(setValues);
                 if (!unsetValues.isEmpty()) {
                     mutations.add(unsetValues);
@@ -779,11 +784,22 @@ public class PTableImpl implements PTable {
         @Override
         public void delete() {
             newMutations();
-            Delete delete = new Delete(key);
-            for (PColumnFamily colFamily : families) {
-            	delete.addFamily(colFamily.getName().getBytes(), ts);
+            // we're using the Tephra column family delete marker here to prevent the translation 
+            // of deletes to puts by the Tephra's TransactionProcessor
+            if (PTableImpl.this.isTransactional()) {
+                Put delete = new Put(key);
+                for (PColumnFamily colFamily : families) {
+                    delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                            HConstants.EMPTY_BYTE_ARRAY);
+                }
+                deleteRow = delete;                
+            } else {
+                Delete delete = new Delete(key);
+                for (PColumnFamily colFamily : families) {
+                	delete.deleteFamily(colFamily.getName().getBytes(), ts);
+                }
+                deleteRow = delete;
             }
-            deleteRow = delete;
             // No need to write to the WAL for indexes
             if (PTableImpl.this.getType() == PTableType.INDEX) {
                 deleteRow.setDurability(Durability.SKIP_WAL);
@@ -888,8 +904,8 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public synchronized void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
-        if (indexMaintainersPtr == null) {
+    public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+        if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
             indexMaintainersPtr = new ImmutableBytesWritable();
             if (indexes.isEmpty()) {
                 indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
@@ -898,6 +914,7 @@ public class PTableImpl implements PTable {
             }
         }
         ptr.set(indexMaintainersPtr.get(), indexMaintainersPtr.getOffset(), indexMaintainersPtr.getLength());
+        return indexMaintainersPtr.getLength() > 0;
     }
 
     @Override
@@ -1014,6 +1031,7 @@ public class PTableImpl implements PTable {
       boolean disableWAL = table.getDisableWAL();
       boolean multiTenant = table.getMultiTenant();
       boolean storeNulls = table.getStoreNulls();
+      boolean isTransactional = table.getTransactional();
       ViewType viewType = null;
       String viewStatement = null;
       List<PName> physicalNames = Collections.emptyList();
@@ -1044,7 +1062,7 @@ public class PTableImpl implements PTable {
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
           (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
               isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
-                multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
+                multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1124,6 +1142,7 @@ public class PTableImpl implements PTable {
       builder.setDisableWAL(table.isWALDisabled());
       builder.setMultiTenant(table.isMultiTenant());
       builder.setStoreNulls(table.getStoreNulls());
+      builder.setTransactional(table.isTransactional());
       if(table.getType() == PTableType.VIEW){
         builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()}));
         builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement())));
@@ -1155,6 +1174,10 @@ public class PTableImpl implements PTable {
     }
 
     @Override
+    public boolean isTransactional() {
+        return isTransactional;
+    }
+
     public int getBaseColumnCount() {
         return baseColumnCount;
     }