You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:41 UTC

[03/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 18a97bd..2a08f45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -115,6 +116,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -163,9 +165,20 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
@@ -183,6 +196,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     // Max number of cached table stats for view or shared index physical tables
     private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512;
     protected final Configuration config;
+    private final ConnectionInfo connectionInfo;
     // Copy of config.getProps(), but read-only to prevent synchronization that we
     // don't need.
     private final ReadOnlyProps props;
@@ -197,7 +211,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     // Lowest HBase version on the cluster.
     private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
-    private boolean hasInvalidIndexConfiguration = false;
+    private boolean isMutableIndexWALCodecInstalled = true;
 
     @GuardedBy("connectionCountLock")
     private int connectionCount = 0;
@@ -205,6 +219,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
 
@@ -257,6 +272,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         for (Entry<String,String> entry : connectionInfo.asProps()) {
             config.set(entry.getKey(), entry.getValue());
         }
+        this.connectionInfo = connectionInfo;
 
         // Without making a copy of the configuration we cons up, we lose some of our properties
         // on the server side during testing.
@@ -282,6 +298,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         this.returnSequenceValues = config.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
     }
 
+    @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return txServiceClient;
+    }
+    
+    private void initTxServiceClient() {
+        String zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
+        ZKClientService zkClientService = ZKClientServices.delegate(
+                  ZKClients.reWatchOnExpire(
+                    ZKClients.retryOnFailure(
+                      ZKClientService.Builder.of(zkQuorumServersString)
+                        .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                        .build(),
+                      RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+                    )
+                  )
+                );
+        zkClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
+    }
+    
     private void openConnection() throws SQLException {
         try {
             // check if we need to authenticate with kerberos
@@ -293,6 +333,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null);
                 logger.info("Successfull login to secure cluster!!");
             }
+            initTxServiceClient();
             this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
@@ -308,9 +349,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, null);
         } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
-            byte[][] schemaAndTableName = new byte[2][];
-            SchemaUtil.getVarChars(tableName, schemaAndTableName);
-            throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1]));
+            throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
         } catch (IOException e) {
         	throw new SQLException(e);
         }
@@ -457,18 +496,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
+    public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
         synchronized (latestMetaDataLock) {
             try {
                 throwConnectionClosedIfNullMetaData();
                 // If existing table isn't older than new table, don't replace
                 // If a client opens a connection at an earlier timestamp, this can happen
-                PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
+                PTable existingTable = latestMetaData.getTableRef(new PTableKey(table.getTenantId(), table.getName().getString())).getTable();
                 if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
                     return latestMetaData;
                 }
             } catch (TableNotFoundException e) {}
-            latestMetaData = latestMetaData.addTable(table);
+            latestMetaData = latestMetaData.addTable(table, resolvedTime);
+            latestMetaDataLock.notifyAll();
+            return latestMetaData;
+        }
+    }
+    
+    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
+    	synchronized (latestMetaDataLock) {
+            throwConnectionClosedIfNullMetaData();
+            latestMetaData = latestMetaData.updateResolvedTimestamp(table, resolvedTime);
             latestMetaDataLock.notifyAll();
             return latestMetaData;
         }
@@ -490,7 +538,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             while (true) {
                 try {
                     try {
-                        table = metaData.getTable(new PTableKey(tenantId, tableName));
+                        table = metaData.getTableRef(new PTableKey(tenantId, tableName)).getTable();
                         /* If the table is at the prior sequence number, then we're good to go.
                          * We know if we've got this far, that the server validated the mutations,
                          * so we'd just need to wait until the other connection that mutated the same
@@ -531,12 +579,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      }
 
     @Override
-    public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls) throws SQLException {
+    public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long resolvedTime) throws SQLException {
         return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
             @Override
             public PMetaData mutate(PMetaData metaData) throws SQLException {
                 try {
-                    return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+                    return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
                 } catch (TableNotFoundException e) {
                     // The DROP TABLE may have been processed first, so just ignore.
                     return metaData;
@@ -556,12 +604,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public PMetaData removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum) throws SQLException {
+    public PMetaData removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException {
         return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
             @Override
             public PMetaData mutate(PMetaData metaData) throws SQLException {
                 try {
-                    return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+                    return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
                 } catch (TableNotFoundException e) {
                     // The DROP TABLE may have been processed first, so just ignore.
                     return metaData;
@@ -594,25 +642,40 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             for (Entry<String,Object> entry : family.getSecond().entrySet()) {
                 String key = entry.getKey();
                 Object value = entry.getValue();
-                columnDesc.setValue(key, value == null ? null : value.toString());
+                setHColumnDescriptorValue(columnDesc, key, value);
             }
         }
         return columnDesc;
     }
-
-    private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Pair<byte[], Map<String,Object>> family) throws SQLException {
-        if (Bytes.equals(hcd.getName(), family.getFirst())) {
-            modifyColumnFamilyDescriptor(hcd, family.getSecond());
+    
+    // Workaround HBASE-14737
+    private static void setHColumnDescriptorValue(HColumnDescriptor columnDesc, String key, Object value) {
+        if (HConstants.VERSIONS.equals(key)) {
+            columnDesc.setMaxVersions(getMaxVersion(value));
         } else {
-            throw new IllegalArgumentException("Column family names don't match. Column descriptor family name: " + hcd.getNameAsString() + ", Family name: " + Bytes.toString(family.getFirst()));
+            columnDesc.setValue(key, value == null ? null : value.toString());
         }
     }
+
+    private static int getMaxVersion(Object value) {
+        if (value == null) {
+            return -1;  // HColumnDescriptor.UNINITIALIZED is private
+        }
+        if (value instanceof Number) {
+            return ((Number)value).intValue();
+        }
+        String stringValue = value.toString();
+        if (stringValue.isEmpty()) {
+            return -1;
+        }
+        return Integer.parseInt(stringValue);
+    }
     
     private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Map<String,Object> props) throws SQLException {
         for (Entry<String, Object> entry : props.entrySet()) {
             String propName = entry.getKey();
             Object value = entry.getValue();
-            hcd.setValue(propName, value == null ? null : value.toString());
+            setHColumnDescriptorValue(hcd, propName, value);
         }
     }
     
@@ -622,8 +685,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
           new HTableDescriptor(TableName.valueOf(tableName));
         for (Entry<String,Object> entry : tableProps.entrySet()) {
             String key = entry.getKey();
-            Object value = entry.getValue();
-            tableDescriptor.setValue(key, value == null ? null : value.toString());
+            if (!TableProperty.isPhoenixTableProperty(key)) {
+                Object value = entry.getValue();
+                tableDescriptor.setValue(key, value == null ? null : value.toString());
+            }
         }
         if (families.isEmpty()) {
             if (tableType != PTableType.VIEW) {
@@ -649,16 +714,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     tableDescriptor.addFamily(columnDescriptor);
                 } else {
                     if (tableType != PTableType.VIEW) {
-                        modifyColumnFamilyDescriptor(tableDescriptor.getFamily(familyByte), family);
+                        HColumnDescriptor columnDescriptor = tableDescriptor.getFamily(familyByte);
+                        if (columnDescriptor == null) {
+                            throw new IllegalArgumentException("Unable to find column descriptor with family name " + Bytes.toString(family.getFirst()));
+                        }
+                        modifyColumnFamilyDescriptor(columnDescriptor, family.getSecond());
                     }
                 }
             }
         }
-        addCoprocessors(tableName, tableDescriptor, tableType);
+        addCoprocessors(tableName, tableDescriptor, tableType, tableProps);
         return tableDescriptor;
     }
 
-    private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException {
+    private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
         // The phoenix jar must be available on HBase classpath
         int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
         try {
@@ -674,17 +743,35 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
             }
+            boolean isTransactional = 
+                    Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
+                    Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
             // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
             if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
                     && !SchemaUtil.isMetaTable(tableName)
-                    && !SchemaUtil.isStatsTable(tableName)
-                    && !descriptor.hasCoprocessor(Indexer.class.getName())) {
-                Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                    && !SchemaUtil.isStatsTable(tableName)) {
+                if (isTransactional) {
+                    if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+                        descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
+                    }
+                    // For alter table, remove non transactional index coprocessor
+                    if (descriptor.hasCoprocessor(Indexer.class.getName())) {
+                        descriptor.removeCoprocessor(Indexer.class.getName());
+                    }
+                } else {
+                    if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+                        // If exception on alter table to transition back to non transactional
+                        if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+                            descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
+                        }
+                        Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+                        opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                        Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                    }
+                }
             }
             if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
@@ -722,6 +809,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
                 }
             }
+            
+            if (isTransactional) {
+                if (!descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+                    descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+                }
+            } else {
+                // If exception on alter table to transition back to non transactional
+                if (descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+                    descriptor.removeCoprocessor(TransactionProcessor.class.getName());
+                }                
+            }
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
@@ -879,8 +977,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     checkClientServerCompatibility();
                 }
 
-                if (!modifyExistingMetaData || existingDesc.equals(newDesc)) {
-                    return existingDesc;
+                if (!modifyExistingMetaData) {
+                    return existingDesc; // Caller already knows that no metadata was changed
+                }
+                boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
+                // If mapping an existing table as transactional, set property so that existing
+                // data is correctly read.
+                if (willBeTx) {
+                    newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString());
+                } else {
+                    // If we think we're creating a non transactional table when it's already
+                    // transactional, don't allow.
+                    if (existingDesc.hasCoprocessor(TransactionProcessor.class.getName())) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+                        .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
+                        .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
+                    }
+                    newDesc.remove(TxConstants.READ_NON_TX_DATA);
+                }
+                if (existingDesc.equals(newDesc)) {
+                    return null; // Indicate that no metadata was changed
                 }
 
                 modifyTable(tableName, newDesc, true);
@@ -987,7 +1103,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     buf.append(name);
                     buf.append(';');
                 }
-                hasInvalidIndexConfiguration |= isInvalidMutableIndexConfig(result.getValue());
+                isMutableIndexWALCodecInstalled &= !isInvalidMutableIndexConfig(result.getValue());
                 if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) {
                     minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue());
                 }
@@ -1096,7 +1212,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             synchronized (latestMetaDataLock) {
                 throwConnectionClosedIfNullMetaData();
-                table = latestMetaData.getTable(new PTableKey(PName.EMPTY_NAME, parentTableName));
+                table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable();
                 latestMetaDataLock.notifyAll();
             }
             if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
@@ -1271,7 +1387,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             builder.addTableMetadataMutations(mp.toByteString());
                         }
                         builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
-                        instance.createTable(controller, builder.build(), rpcCallback);
+                        CreateTableRequest build = builder.build();
+						instance.createTable(controller, build, rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();
                         }
@@ -1460,7 +1577,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (metadata == null) {
                 throwConnectionClosedException();
             }
-            table = metadata.getTable(new PTableKey(tenantId, name));
+            table = metadata.getTableRef(new PTableKey(tenantId, name)).getTable();
             if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
                 throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
             }
@@ -1505,22 +1622,213 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     
     @Override
     public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException {
-        Map<String, Object> tableProps = new HashMap<String, Object>();
-        HTableDescriptor tableDescriptor = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, tableProps);
-        SQLException sqlE = null;
+    	List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size());
+    	Map<String, Object> tableProps = new HashMap<String, Object>();
+        Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
+        Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
+        boolean nonTxToTx = false;
+        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
+        HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
+        HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
         if (tableDescriptor != null) {
-            try {
-                boolean modifyHTable = true;
-                if (table.getType() == PTableType.VIEW) {
-                    boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE,
-                            QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
-                    // When adding a column to a view, base physical table should only be modified when new column families are being added.  
-                    modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded);
+            tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+            origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+            tableDescriptors.add(tableDescriptor);
+            origTableDescriptors.add(origTableDescriptor);
+            nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
+            /*
+             * If the table was transitioned from non transactional to transactional, we need
+             * to also transition the index tables.
+             */
+            if (nonTxToTx) {
+                updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors);
+            }
+        }
+        
+        boolean success = false;
+        boolean metaDataUpdated = !tableDescriptors.isEmpty();
+        boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
+        MetaDataMutationResult result = null;
+        try {
+            boolean modifyHTable = true;
+            if (table.getType() == PTableType.VIEW) {
+                boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE,
+                        QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
+                // When adding a column to a view, base physical table should only be modified when new column families are being added.  
+                modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded);
+            }
+            if (modifyHTable) {
+                sendHBaseMetaData(tableDescriptors, pollingNeeded);
+            }
+            
+            // Special case for call during drop table to ensure that the empty column family exists.
+            // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
+            // as args to this function, we have no way of getting them in this case.
+            // TODO: change to  if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
+            // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
+            if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
+                return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
+            }
+             byte[][] rowKeyMetaData = new byte[3][];
+             PTableType tableType = table.getType();
+    
+            Mutation m = tableMetaData.get(0);
+            byte[] rowKey = m.getRow();
+            SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
+            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
+            
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            result =  metaDataCoprocessorExec(tableKey,
+                new Batch.Call<MetaDataService, MetaDataResponse>() {
+                    @Override
+                    public MetaDataResponse call(MetaDataService instance) throws IOException {
+                        ServerRpcController controller = new ServerRpcController();
+                        BlockingRpcCallback<MetaDataResponse> rpcCallback =
+                                new BlockingRpcCallback<MetaDataResponse>();
+                        AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
+                        for (Mutation m : tableMetaData) {
+                            MutationProto mp = ProtobufUtil.toProto(m);
+                            builder.addTableMetadataMutations(mp.toByteString());
+                        }
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                        instance.addColumn(controller, builder.build(), rpcCallback);
+                        if(controller.getFailedOn() != null) {
+                            throw controller.getFailedOn();
+                        }
+                        return rpcCallback.get();
+                    }
+                });
+    
+            if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success
+                success = true;
+                // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
+                if (  MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
+                   && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
+                    flushTable(table.getPhysicalName().getBytes());
+                }
+    
+                if (tableType == PTableType.TABLE) {
+                    // If we're changing MULTI_TENANT to true or false, create or drop the view index table
+                    if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
+                        long timestamp = MetaDataUtil.getClientTimeStamp(m);
+                        if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
+                            this.ensureViewIndexTableCreated(table, timestamp);
+                        } else {
+                            this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
+                        }
+                    }
                 }
-                boolean pollingNotNeeded = (!tableProps.isEmpty() && !existingColumnFamilies(table).containsAll(colFamiliesForPColumnsToBeAdded));
-                if (modifyHTable) {
-                    modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded);
+            }
+        } finally {
+            // If we weren't successful with our metadata update
+            // and we've already pushed the HBase metadata changes to the server
+            // and we've tried to go from non transactional to transactional
+            // then we must undo the metadata change otherwise the table will
+            // no longer function correctly.
+            // Note that if this fails, we're in a corrupt state.
+            if (!success && metaDataUpdated && nonTxToTx) {
+                sendHBaseMetaData(origTableDescriptors, pollingNeeded);
+            }
+        }
+        return result;
+    }
+    private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor,
+            String txValue, Set<HTableDescriptor> descriptorsToUpdate, Set<HTableDescriptor> origDescriptors) throws SQLException {
+        HBaseAdmin admin = null;
+        byte[] physicalTableName = table.getPhysicalName().getBytes();
+        try {
+            admin = new HBaseAdmin(config);
+            setTransactional(tableDescriptor, table.getType(), txValue, tableProps);
+            Map<String, Object> indexTableProps;
+            if (txValue == null) {
+                indexTableProps = Collections.<String,Object>emptyMap();
+            } else {
+                indexTableProps = Maps.newHashMapWithExpectedSize(1);
+                indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue));
+            }
+            for (PTable index : table.getIndexes()) {
+                HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes());
+                origDescriptors.add(indexDescriptor);
+                indexDescriptor = new HTableDescriptor(indexDescriptor);
+                descriptorsToUpdate.add(indexDescriptor);
+                if (index.getColumnFamilies().isEmpty()) {
+                    byte[] dataFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+                    byte[] indexFamilyName = SchemaUtil.getEmptyColumnFamily(index);
+                    HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName);
+                    HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName);
+                    indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                } else {
+                    for (PColumnFamily family : index.getColumnFamilies()) {
+                        byte[] familyName = family.getName().getBytes();
+                        indexDescriptor.getFamily(familyName).setMaxVersions(tableDescriptor.getFamily(familyName).getMaxVersions());
+                        HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+                        HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+                        indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+                        indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                    }
                 }
+                setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps);
+            }
+            try {
+                HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName));
+                origDescriptors.add(indexDescriptor);
+                indexDescriptor = new HTableDescriptor(indexDescriptor);
+                descriptorsToUpdate.add(indexDescriptor);
+                setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
+                setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+            } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+                // Ignore, as we may never have created a view index table
+            }
+            try {
+                HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName));
+                origDescriptors.add(indexDescriptor);
+                indexDescriptor = new HTableDescriptor(indexDescriptor);
+                descriptorsToUpdate.add(indexDescriptor);
+                setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
+                setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+            } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+                // Ignore, as we may never have created a view index table
+            }
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (admin != null) admin.close();
+            } catch (IOException e) {
+                logger.warn("Could not close admin",e);
+            }
+        }
+    }
+    private void setSharedIndexMaxVersion(PTable table, HTableDescriptor tableDescriptor,
+            HTableDescriptor indexDescriptor) {
+        if (table.getColumnFamilies().isEmpty()) {
+            byte[] familyName = SchemaUtil.getEmptyColumnFamily(table);
+            HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+            HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+            indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+            indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+        } else {
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                byte[] familyName = family.getName().getBytes();
+                HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
+                if (indexColDescriptor != null) {
+                    HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
+                    indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
+                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                }
+            }
+        }
+    }
+    
+    private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException {
+        SQLException sqlE = null;
+        for (HTableDescriptor descriptor : tableDescriptors) {
+            try {
+                modifyTable(descriptor.getName(), descriptor, pollingNeeded);
             } catch (IOException e) {
                 sqlE = ServerUtil.parseServerException(e);
             } catch (InterruptedException e) {
@@ -1535,76 +1843,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
             }
         }
-        
-        // Special case for call during drop table to ensure that the empty column family exists.
-        // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
-        // as args to this function, we have no way of getting them in this case.
-        // TODO: change to  if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
-        // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
-        if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
-            return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
-        }
-         byte[][] rowKeyMetaData = new byte[3][];
-         PTableType tableType = table.getType();
-
-        Mutation m = tableMetaData.get(0);
-        byte[] rowKey = m.getRow();
-        SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
-        byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-        byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-        byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-        byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
-        
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        MetaDataMutationResult result =  metaDataCoprocessorExec(tableKey,
-            new Batch.Call<MetaDataService, MetaDataResponse>() {
-                @Override
-                public MetaDataResponse call(MetaDataService instance) throws IOException {
-                    ServerRpcController controller = new ServerRpcController();
-                    BlockingRpcCallback<MetaDataResponse> rpcCallback =
-                            new BlockingRpcCallback<MetaDataResponse>();
-                    AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
-                    for (Mutation m : tableMetaData) {
-                        MutationProto mp = ProtobufUtil.toProto(m);
-                        builder.addTableMetadataMutations(mp.toByteString());
-                    }
-                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
-                    instance.addColumn(controller, builder.build(), rpcCallback);
-                    if(controller.getFailedOn() != null) {
-                        throw controller.getFailedOn();
-                    }
-                    return rpcCallback.get();
-                }
-            });
-
-        if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND) { // Success
-            // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
-            if (  MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
-               && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
-                flushTable(table.getPhysicalName().getBytes());
-            }
-
-            if (tableType == PTableType.TABLE) {
-                // If we're changing MULTI_TENANT to true or false, create or drop the view index table
-                if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
-                    long timestamp = MetaDataUtil.getClientTimeStamp(m);
-                    if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
-                        this.ensureViewIndexTableCreated(table, timestamp);
-                    } else {
-                        this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
-                    }
-                }
-            }
+    }
+    private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
+        if (txValue == null) {
+            tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
+        } else {
+            tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
         }
-        return result;
+        this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
     }
     
-    private HTableDescriptor separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, Map<String, Object> tableProps) throws SQLException {
+    private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
         Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size());
         Map<String,Object> commonFamilyProps = new HashMap<>();
-        boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && colFamiliesForPColumnsToBeAdded.size() > 0;
+        boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty();
         HashSet<String> existingColumnFamilies = existingColumnFamilies(table);
         Map<String, Map<String, Object>> allFamiliesProps = new HashMap<>(existingColumnFamilies.size());
+        boolean isTransactional = table.isTransactional();
+        boolean willBeTransactional = false;
+        boolean isOrWillBeTransactional = isTransactional;
+        Integer newTTL = null;
         for (String family : properties.keySet()) {
             List<Pair<String, Object>> propsList = properties.get(family);
             if (propsList != null && propsList.size() > 0) {
@@ -1630,22 +1888,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         if (TableProperty.isPhoenixTableProperty(propName)) {
                             TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
                             if (propName.equals(TTL)) {
+                                newTTL = ((Number)prop.getSecond()).intValue();
                                 // Even though TTL is really a HColumnProperty we treat it specially.
                                 // We enforce that all column families have the same TTL.
                                 commonFamilyProps.put(propName, prop.getSecond());
+                            } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
+                                willBeTransactional = isOrWillBeTransactional = true;
+                                tableProps.put(TxConstants.READ_NON_TX_DATA, propValue);
                             }
                         } else {
                             if (isHColumnProperty(propName)) {
                                 if (family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
-                                    commonFamilyProps.put(propName, prop.getSecond());
+                                    commonFamilyProps.put(propName, propValue);
                                 } else {
-                                    colFamilyPropsMap.put(propName, prop.getSecond());
+                                    colFamilyPropsMap.put(propName, propValue);
                                 }
                             } else {
                                 // invalid property - neither of HTableProp, HColumnProp or PhoenixTableProp
                                 // FIXME: This isn't getting triggered as currently a property gets evaluated 
                                 // as HTableProp if its neither HColumnProp or PhoenixTableProp.
-                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
+                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY)
                                 .setMessage("Column Family: " + family + ", Property: " + propName).build()
                                 .buildException();
                             }
@@ -1728,16 +1990,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             allFamiliesProps.put(Bytes.toString(table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : table.getDefaultFamilyName().getBytes() ), commonFamilyProps);
         }
 
-
         // Views are not allowed to have any of these properties.
         if (table.getType() == PTableType.VIEW && (!stmtFamiliesPropsMap.isEmpty() || !commonFamilyProps.isEmpty() || !tableProps.isEmpty())) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
             .buildException();
         }
+        
         HTableDescriptor newTableDescriptor = null;
+        HTableDescriptor origTableDescriptor = null;
         if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) {
             byte[] tableNameBytes = Bytes.toBytes(table.getPhysicalName().getString());
-            HTableDescriptor existingTableDescriptor = getTableDescriptor(tableNameBytes);
+            HTableDescriptor existingTableDescriptor = origTableDescriptor = getTableDescriptor(tableNameBytes);
             newTableDescriptor = new HTableDescriptor(existingTableDescriptor);
             if (!tableProps.isEmpty()) {
                 // add all the table properties to the existing table descriptor
@@ -1747,21 +2010,97 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
             if (addingColumns) {
                 // Make sure that all the CFs of the table have the same TTL as the empty CF. 
-                setTTLToEmptyCFTTL(allFamiliesProps, table, newTableDescriptor);
+                setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, newTTL);
+            }
+            // Set TTL on all table column families, even if they're not referenced here
+            if (newTTL != null) {
+                for (PColumnFamily family : table.getColumnFamilies()) {
+                    if (!allFamiliesProps.containsKey(family.getName().getString())) {
+                        Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
+                        familyProps.put(TTL, newTTL);
+                        allFamiliesProps.put(family.getName().getString(), familyProps);
+                    }
+                }
+            }
+            Integer defaultTxMaxVersions = null;
+            if (isOrWillBeTransactional) {
+                // Calculate default for max versions
+                Map<String, Object> emptyFamilyProps = allFamiliesProps.get(SchemaUtil.getEmptyColumnFamilyAsString(table));
+                if (emptyFamilyProps != null) {
+                    defaultTxMaxVersions = (Integer)emptyFamilyProps.get(HConstants.VERSIONS);
+                }
+                if (defaultTxMaxVersions == null) {
+                    if (isTransactional) {
+                        defaultTxMaxVersions = newTableDescriptor.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getMaxVersions();
+                    } else {
+                        defaultTxMaxVersions = 
+                                this.getProps().getInt(
+                                        QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
+                                        QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
+                    }
+                }
+                if (willBeTransactional) {
+                    // Set VERSIONS for all column families when transitioning to transactional
+                    for (PColumnFamily family : table.getColumnFamilies()) {
+                        if (!allFamiliesProps.containsKey(family.getName().getString())) {
+                            Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
+                            familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
+                            allFamiliesProps.put(family.getName().getString(), familyProps);
+                        }
+                    }
+                }
+            }
+            // Set Tephra's TTL property based on HBase property if we're 
+            // transitioning to become transactional or setting TTL on
+            // an already transactional table.
+            if (isOrWillBeTransactional) {
+                int ttl = getTTL(table, newTableDescriptor, newTTL);
+                if (ttl != HColumnDescriptor.DEFAULT_TTL) {
+                    for (Map.Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
+                        Map<String, Object> props = entry.getValue();
+                        if (props == null) {
+                            props = new HashMap<String, Object>();
+                        }
+                        props.put(TxConstants.PROPERTY_TTL, ttl);
+                        // Remove HBase TTL if we're not transitioning an existing table to become transactional
+                        // or if the existing transactional table wasn't originally non transactional.
+                        if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) {
+                            props.remove(TTL);
+                        }
+                    }
+                }
             }
             for (Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
-                byte[] cf = entry.getKey().getBytes();
+                Map<String,Object> familyProps = entry.getValue();
+                if (isOrWillBeTransactional) {
+                    if (!familyProps.containsKey(HConstants.VERSIONS)) {
+                        familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
+                    }
+                }
+                byte[] cf = Bytes.toBytes(entry.getKey());
                 HColumnDescriptor colDescriptor = newTableDescriptor.getFamily(cf);
                 if (colDescriptor == null) {
                     // new column family
-                    colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, entry.getValue()), table.getType());
+                    colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, familyProps), table.getType());
                     newTableDescriptor.addFamily(colDescriptor);
                 } else {
-                    modifyColumnFamilyDescriptor(colDescriptor, entry.getValue());
+                    modifyColumnFamilyDescriptor(colDescriptor, familyProps);
+                }
+                if (isOrWillBeTransactional) {
+                    checkTransactionalVersionsValue(colDescriptor);
                 }
             }
         }
-        return newTableDescriptor;
+        return new Pair<>(origTableDescriptor, newTableDescriptor);
+    }
+    
+    private void checkTransactionalVersionsValue(HColumnDescriptor colDescriptor) throws SQLException {
+        int maxVersions = colDescriptor.getMaxVersions();
+        if (maxVersions <= 1) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE)
+            .setFamilyName(colDescriptor.getNameAsString())
+            .build().buildException();
+        }
     }
     
     private boolean isHColumnProperty(String propName) {
@@ -1775,7 +2114,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private HashSet<String> existingColumnFamiliesForBaseTable(PName baseTableName) throws TableNotFoundException {
         synchronized (latestMetaDataLock) {
             throwConnectionClosedIfNullMetaData();
-            PTable table = latestMetaData.getTable(new PTableKey(null, baseTableName.getString()));
+            PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable();
             latestMetaDataLock.notifyAll();
             return existingColumnFamilies(table);
         }
@@ -1790,20 +2129,23 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return cfNames;
     }
 
-    private int getTTLForEmptyCf(byte[] emptyCf, byte[] tableNameBytes, HTableDescriptor tableDescriptor) throws SQLException {
-        if (tableDescriptor == null) {
-            tableDescriptor = getTableDescriptor(tableNameBytes);
-        }
-        return tableDescriptor.getFamily(emptyCf).getTimeToLive();
+    private static int getTTL(PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
+        // If we're setting TTL now, then use that value. Otherwise, use empty column family value
+        int ttl = newTTL != null ? newTTL 
+                : tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive();
+        return ttl;
     }
     
-    private void setTTLToEmptyCFTTL(Map<String, Map<String, Object>> familyProps, PTable table, 
-            HTableDescriptor tableDesc) throws SQLException {
+    private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table, 
+            HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
         if (!familyProps.isEmpty()) {
-            int emptyCFTTL = getTTLForEmptyCf(SchemaUtil.getEmptyColumnFamily(table), table.getPhysicalName().getBytes(), tableDesc);
-            for (String family : familyProps.keySet()) {
-                Map<String, Object> props = familyProps.get(family) != null ? familyProps.get(family) : new HashMap<String, Object>();
-                props.put(TTL, emptyCFTTL);
+            int ttl = getTTL(table, tableDesc, newTTL);
+            for (Map.Entry<String, Map<String, Object>> entry : familyProps.entrySet()) {
+                Map<String, Object> props = entry.getValue();
+                if (props == null) {
+                    props = new HashMap<String, Object>();
+                }
+                props.put(TTL, ttl);
             }
         }
     }
@@ -1860,7 +2202,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
         // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
-        PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache());
+        PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
         SQLException sqlE = null;
         try {
             metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns );
@@ -1938,7 +2280,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 String columnsToAdd = "";
                                 if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
                                     // We know that we always need to add the STORE_NULLS column for 4.3 release
-                                    columnsToAdd = ", " + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                    columnsToAdd += "," + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
                                     HBaseAdmin admin = null;
                                     try {
                                         admin = getAdmin();
@@ -2020,6 +2362,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                             MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd, false);
                                 }
+                                if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+                                    columnsToAdd = PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                    metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, columnsToAdd, false);
+                                }
                                 
                             }
                             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
@@ -2138,8 +2485,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public boolean hasInvalidIndexConfiguration() {
-        return hasInvalidIndexConfiguration;
+    public boolean isMutableIndexWALCodecInstalled() {
+        return isMutableIndexWALCodecInstalled;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 87fb88e..77e4ba9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -24,9 +24,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -81,6 +83,10 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SequenceUtil;
 
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.inmemory.InMemoryTxSystemClient;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -99,18 +105,37 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     private PMetaData metaData;
     private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap();
     private final String userName;
+    private final TransactionSystemClient txSystemClient;
     private KeyValueBuilder kvBuilder;
     private volatile boolean initialized;
     private volatile SQLException initializationException;
     private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap();
     
-    public ConnectionlessQueryServicesImpl(QueryServices queryServices, ConnectionInfo connInfo) {
-        super(queryServices);
+    public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) {
+        super(services);
         userName = connInfo.getPrincipal();
         metaData = newEmptyMetaData();
         
         // Use KeyValueBuilder that builds real KeyValues, as our test utils require this
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+        Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        for (Entry<String,String> entry : services.getProps()) {
+            config.set(entry.getKey(), entry.getValue());
+        }
+        if (info != null) {
+            for (Object key : info.keySet()) {
+                config.set((String) key, info.getProperty((String) key));
+            }
+        }
+        for (Entry<String,String> entry : connInfo.asProps()) {
+            config.set(entry.getKey(), entry.getValue());
+        }
+
+        // Without making a copy of the configuration we cons up, we lose some of our properties
+        // on the server side during testing.
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+        TransactionManager txnManager = new TransactionManager(config);
+        this.txSystemClient = new InMemoryTxSystemClient(txnManager);
     }
 
     private PMetaData newEmptyMetaData() {
@@ -141,14 +166,19 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
-        return metaData = metaData.addTable(table);
+    public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+        return metaData = metaData.addTable(table, resolvedTime);
+    }
+    
+    @Override
+    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+        return metaData = metaData.updateResolvedTimestamp(table, resolvedTimestamp);
     }
 
     @Override
     public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
-            long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
-        return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+            long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+        return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
     }
 
     @Override
@@ -159,8 +189,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
 
     @Override
     public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp,
-            long tableSeqNum) throws SQLException {
-        return metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+            long tableSeqNum, long resolvedTime) throws SQLException {
+        return metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
     }
 
     
@@ -175,7 +205,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         // to get anything from the server (since we don't have a connection)
         try {
             String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes);
-            PTable table = metaData.getTable(new PTableKey(tenantId, fullTableName));
+            PTable table = metaData.getTableRef(new PTableKey(tenantId, fullTableName)).getTable();
             return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, table, true);
         } catch (TableNotFoundException e) {
             return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null);
@@ -345,7 +375,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         String indexName = Bytes.toString(rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
         String indexTableName = SchemaUtil.getTableName(schemaName, indexName);
         PName tenantId = tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes);
-        PTable index = metaData.getTable(new PTableKey(tenantId, indexTableName));
+        PTable index = metaData.getTableRef(new PTableKey(tenantId, indexTableName)).getTable();
         index = PTableImpl.makePTable(index,newState == PIndexState.USABLE ? PIndexState.ACTIVE : newState == PIndexState.UNUSABLE ? PIndexState.INACTIVE : newState);
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, index);
     }
@@ -360,8 +390,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public boolean hasInvalidIndexConfiguration() {
-        return false;
+    public boolean isMutableIndexWALCodecInstalled() {
+        return true;
     }
 
     @Override
@@ -492,6 +522,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return txSystemClient;
+    }
+ 
     public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
             throws SQLException {
         return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 4153652..4952355 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -46,6 +46,8 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.PTableStats;
 
+import co.cask.tephra.TransactionSystemClient;
+
 
 public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices {
 
@@ -74,14 +76,19 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
-        return getDelegate().addTable(table);
+    public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+        return getDelegate().addTable(table, resolvedTime);
+    }
+    
+    @Override
+    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+        return getDelegate().updateResolvedTimestamp(table, resolvedTimestamp);
     }
 
     @Override
     public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
-            long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
-        return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+            long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+        return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
     }
 
     @Override
@@ -92,8 +99,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
 
     @Override
     public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp,
-            long tableSeqNum) throws SQLException {
-        return getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum);
+            long tableSeqNum, long resolvedTime) throws SQLException {
+        return getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
     }
 
     @Override
@@ -165,8 +172,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public boolean hasInvalidIndexConfiguration() {
-        return getDelegate().hasInvalidIndexConfiguration();
+    public boolean isMutableIndexWALCodecInstalled() {
+        return getDelegate().isMutableIndexWALCodecInstalled();
     }
 
     @Override
@@ -254,6 +261,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return getDelegate().getTransactionSystemClient();
+    }
+
     public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
             throws SQLException {
         return getDelegate().createFunction(functionData, function, temporary);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
index 76e7593..8e7a70d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
@@ -35,10 +35,11 @@ import org.apache.phoenix.schema.PTable;
  * @since 0.1
  */
 public interface MetaDataMutated {
-    PMetaData addTable(PTable table) throws SQLException;
+    PMetaData addTable(PTable table, long resolvedTime) throws SQLException;
+    PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException;
     PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException;
-    PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException;
-    PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException;
+    PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException;
+    PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException;
     PMetaData addFunction(PFunction function) throws SQLException;
     PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 268bfc1..1cac37f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -96,6 +96,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
@@ -112,6 +113,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
@@ -172,9 +174,15 @@ public interface QueryConstants {
     public final static int MILLIS_IN_DAY = 1000 * 60 * 60 * 24;
 
     public static final String EMPTY_COLUMN_NAME = "_0";
+    // For transactional tables, the value of our empty key value can no longer be empty
+    // since empty values are treated as column delete markers.
     public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
+    public final static String EMPTY_COLUMN_VALUE = "x";
+    public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+    public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+            EMPTY_COLUMN_VALUE_BYTES);
 
     public static final String DEFAULT_COLUMN_FAMILY = "0";
     public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
@@ -251,13 +259,15 @@ public interface QueryConstants {
             STORE_NULLS + " BOOLEAN," +
             BASE_COLUMN_COUNT + " INTEGER," +
             // Column metadata (will be null for table row)
-            IS_ROW_TIMESTAMP + " BOOLEAN, " + 
+            IS_ROW_TIMESTAMP + " BOOLEAN, " +
+            TRANSACTIONAL + " BOOLEAN," +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a tenant's metadata from being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_STATS_TABLE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
@@ -278,7 +288,8 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a physical table's stats from being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_SEQUENCE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +
@@ -296,7 +307,8 @@ public interface QueryConstants {
             LIMIT_REACHED_FLAG + " BOOLEAN \n" +
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
-            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_FUNCTION_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"(\n" +
@@ -320,6 +332,7 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
             // Install split policy to prevent a tenant's metadata from being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index df8cdc3..5f42ff8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -159,6 +159,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
+    public static final String DEFAULT_TRANSACTIONAL_ATTRIB = "phoenix.transactions.default.enabled";
     public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
     
     // rpc queue configs
@@ -172,6 +173,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
     public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";
     
+    public static final String MAX_VERSIONS_TRANSACTIONAL_ATTRIB = "phoenix.transactions.maxVersions";
+
     // queryserver configuration keys
     public static final String QUERY_SERVER_SERIALIZATION_ATTRIB = "phoenix.queryserver.serialization";
     public static final String QUERY_SERVER_META_FACTORY_ATTRIB = "phoenix.queryserver.metafactory.class";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0434ac9..8c2f895 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -116,7 +116,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
     public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
-    public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
+    public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
     // Only the first chunked batches are fetched in parallel, so this default
     // should be on the relatively bigger side of things. Bigger means more
@@ -190,7 +190,9 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_STORE_NULLS = false;
 
     // TODO Change this to true as part of PHOENIX-1543
+    // We'll also need this for transactions to work correctly
     public static final boolean DEFAULT_AUTO_COMMIT = false;
+    public static final boolean DEFAULT_TRANSACTIONAL = false;
     public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
     
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
@@ -202,6 +204,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
     public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
     public static final boolean DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = true;
+    public static final int DEFAULT_MAX_VERSIONS_TRANSACTIONAL = Integer.MAX_VALUE;
     
     public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 5a0d63b..7fb90a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -162,8 +162,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
-        delegate.getIndexMaintainers(ptr, connection);
+    public boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+        return delegate.getIndexMaintainers(ptr, connection);
     }
 
     @Override
@@ -238,6 +238,10 @@ public class DelegateTable implements PTable {
     }
 
     @Override
+    public boolean isTransactional() {
+        return delegate.isTransactional();
+    }
+
     public int getBaseColumnCount() {
         return delegate.getBaseColumnCount();
     }