You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/04/13 05:31:19 UTC

[5/5] phoenix git commit: PHOENIX-4605 Support running multiple transaction providers

PHOENIX-4605 Support running multiple transaction providers


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/15fa00fa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/15fa00fa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/15fa00fa

Branch: refs/heads/5.x-HBase-2.0
Commit: 15fa00fa59472c5f61a211af5c723b4803a3cac3
Parents: 5d4cb80
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Apr 11 20:06:35 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 12 22:30:56 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AlterTableWithViewsIT.java  |   6 +-
 .../ConnectionQueryServicesTestImpl.java        |  34 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  11 +-
 .../phoenix/tx/ParameterizedTransactionIT.java  |  12 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  12 +
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |   5 +-
 .../phoenix/cache/IndexMetaDataCache.java       |   5 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   3 +-
 .../apache/phoenix/compile/FromCompiler.java    |   2 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   6 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   3 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  72 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   3 +-
 .../coprocessor/MetaDataRegionObserver.java     |   4 +-
 .../PhoenixTransactionalProcessor.java          |  52 --
 .../coprocessor/ServerCachingEndpointImpl.java  |   4 +-
 .../TephraTransactionalProcessor.java           |  52 ++
 .../UngroupedAggregateRegionObserver.java       |  10 +-
 .../coprocessor/generated/PTableProtos.java     | 110 ++-
 .../phoenix/exception/SQLExceptionCode.java     |   4 +
 .../apache/phoenix/execute/BaseQueryPlan.java   |   3 +
 .../apache/phoenix/execute/MutationState.java   |  75 +-
 .../PhoenixTxIndexMutationGenerator.java        |  10 +-
 .../phoenix/expression/ExpressionType.java      | 119 +--
 .../TransactionProviderNameFunction.java        |  81 ++
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../index/IndexMetaDataCacheFactory.java        |   2 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |   1 -
 .../index/PhoenixIndexMetaDataBuilder.java      |   7 +-
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  12 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 .../index/PhoenixIndexPartialBuildMapper.java   |   4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   4 +
 .../query/ConnectionQueryServicesImpl.java      |  69 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   9 +-
 .../query/DelegateConnectionQueryServices.java  |   7 +-
 .../apache/phoenix/query/QueryConstants.java    | 108 ++-
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |  10 +-
 .../apache/phoenix/schema/DelegateTable.java    |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 129 ++-
 .../java/org/apache/phoenix/schema/PTable.java  |   2 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  67 +-
 .../apache/phoenix/schema/TableProperty.java    |  18 +
 .../transaction/OmidTransactionContext.java     |  56 +-
 .../transaction/OmidTransactionProvider.java    |  54 +-
 .../transaction/OmidTransactionTable.java       | 380 ---------
 .../transaction/PhoenixTransactionClient.java   |  23 +
 .../transaction/PhoenixTransactionContext.java  | 165 ++--
 .../transaction/PhoenixTransactionProvider.java |  51 ++
 .../transaction/PhoenixTransactionService.java  |  24 +
 .../transaction/PhoenixTransactionalTable.java  | 798 -------------------
 .../transaction/TephraTransactionContext.java   | 198 +----
 .../transaction/TephraTransactionProvider.java  | 161 +++-
 .../transaction/TephraTransactionTable.java     | 360 ---------
 .../phoenix/transaction/TransactionFactory.java |  57 +-
 .../transaction/TransactionProvider.java        |  36 -
 .../org/apache/phoenix/util/PhoenixRuntime.java |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  15 +
 .../apache/phoenix/util/TransactionUtil.java    |  49 +-
 .../phoenix/execute/CorrelatePlanTest.java      |   4 +-
 .../execute/LiteralResultIteratorPlanTest.java  |   5 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  26 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  15 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 70 files changed, 1320 insertions(+), 2338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index aeb892e..7e1befe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -36,7 +36,7 @@ import java.util.Collection;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
@@ -748,7 +748,7 @@ public class AlterTableWithViewsIT extends ParallelStatsDisabledIT {
             PName tenantId = isMultiTenant ? PNameFactory.newName("tenant1") : null;
             PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
             Table htable = phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertFalse(phoenixConn.getTable(new PTableKey(null, baseTableName)).isTransactional());
             assertFalse(viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, viewOfTable)).isTransactional());
             
@@ -757,7 +757,7 @@ public class AlterTableWithViewsIT extends ParallelStatsDisabledIT {
             // query the view to force the table cache to be updated
             viewConn.createStatement().execute("SELECT * FROM " + viewOfTable);
             htable = phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertTrue(phoenixConn.getTable(new PTableKey(null, baseTableName)).isTransactional());
             assertTrue(viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, viewOfTable)).isTransactional());
         } 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index a1ad1ad..3db93b0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +30,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.PhoenixTransactionService;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.SQLCloseables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
@@ -42,9 +49,11 @@ import com.google.common.collect.Sets;
  * @since 0.1
  */
 public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl {
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesTestImpl.class);
     protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster
     // Track open connections to free them on close as unit tests don't always do this.
     private Set<PhoenixConnection> connections = Sets.newHashSet();
+    private final PhoenixTransactionService[] txServices = new PhoenixTransactionService[TransactionFactory.Provider.values().length];
     
     public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties props) throws SQLException {
         super(services, info, props);
@@ -68,12 +77,33 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
                 // Make copy to prevent ConcurrentModificationException (TODO: figure out why this is necessary)
                 connections = new ArrayList<>(this.connections);
                 this.connections = Sets.newHashSet();
+                
+                // shut down the tx client service if we created one to support transactions
+                for (PhoenixTransactionService service : txServices) {
+                    if (service != null) {
+                        try {
+                            service.close();
+                        } catch (IOException e) {
+                            logger.warn(e.getMessage(), e);
+                        }
+                    }
+                }
+
             }
             SQLCloseables.closeAll(connections);
-             long unfreedBytes = clearCache();
-             assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes);
+            long unfreedBytes = clearCache();
+            assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes);
         } finally {
             super.close();
         }
     }
+    
+    @Override
+    public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) {
+        PhoenixTransactionService txService = txServices[provider.ordinal()];
+        if (txService == null) {
+            txService = txServices[provider.ordinal()] = provider.getTransactionProvider().getTransactionService(config, connectionInfo);
+        }
+        return super.initTransactionClient(provider);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 4d8e297..52d58c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -43,7 +43,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -227,9 +226,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         }
 
         PhoenixTransactionContext txContext =
-              TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
-        PhoenixTransactionalTable txTable =
-              TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
+              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
+        Table txTable =
+                txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
 
@@ -279,9 +278,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         
         // Repeat the same as above, but this time abort the transaction
         txContext =
-              TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
+              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
         txTable =
-              TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
+              txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index b25ec03..1f02386 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -282,9 +282,9 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true");
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName));
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index));
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
 
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')");
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL");
@@ -359,7 +359,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName));
-        assertFalse(htable.getDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertFalse(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
                 getColumnFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -377,7 +377,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         PTable table = pconn.getTable(new PTableKey(null, t1));
         Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         
         try {
             ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -411,7 +411,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         table = pconn.getTable(new PTableKey(null, t1));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index e743bfd..88fba47 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -45,6 +46,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -130,6 +132,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals("Transactional table was not marked as transactional in JDBC API.",
                 "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            assertEquals(TransactionFactory.Provider.TEPHRA.name(), rs.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
 
             String nonTransactTableName = generateUniqueName();
             Statement stmt2 = conn.createStatement();
@@ -140,6 +143,15 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             assertTrue(rs2.next());
             assertEquals("Non-transactional table was marked as transactional in JDBC API.",
                 "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            assertNull(rs2.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
+            
+            try {
+                stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " +
+                        "TRANSACTION_PROVIDER=foo");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER.getErrorCode(), e.getErrorCode());
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 5083016..2c71ee3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Ignore;
@@ -267,7 +268,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		ResultSet rs;
 		MutationState state = conn.unwrap(PhoenixConnection.class)
 				.getMutationState();
-		state.startTransaction();
+		state.startTransaction(TransactionFactory.Provider.TEPHRA);
 		long wp = state.getWritePointer();
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
@@ -331,7 +332,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 			conn.commit();
 
 	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-	        state.startTransaction();
+	        state.startTransaction(TransactionFactory.Provider.TEPHRA);
 	        long wp = state.getWritePointer();
 	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
 	        assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 17e6fb6..9f3dd59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.util.ScanUtil;
 
 public interface IndexMetaDataCache extends Closeable {
-    public static int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0);
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() {
 
         @Override
@@ -47,7 +46,7 @@ public interface IndexMetaDataCache extends Closeable {
         
         @Override
         public int getClientVersion() {
-            return UNKNOWN_CLIENT_VERSION;
+            return ScanUtil.UNKNOWN_CLIENT_VERSION;
         }
         
     };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 40917e8..5ed4130 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -784,8 +783,8 @@ public class DeleteCompiler {
                     byte[] uuidValue = ServerCacheClient.generateId();
                     context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                     context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                    context.getScan().setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                     context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                    ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
                 }
                 ResultIterator iterator = aggPlan.iterator();
                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index c84e1d7..1341ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -821,7 +821,7 @@ public class FromCompiler {
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null,
                     MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
                     Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false,
-                    false, null, null, null, false, false, 0, 0L, SchemaUtil
+                    false, null, null, null, false, null, 0, 0L, SchemaUtil
                             .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
 
             String alias = subselectNode.getAlias();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 88e8f50..824d933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1276,7 +1276,7 @@ public class JoinCompiler {
                 left.getBucketNum(), merged, left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName> emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(),
-                left.getIndexType(), left.rowKeyOrderOptimizable(), left.isTransactional(),
+                left.getIndexType(), left.rowKeyOrderOptimizable(), left.getTransactionProvider(),
                 left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), 
                 left.getAutoPartitionSeqName(), left.isAppendOnlySchema(), ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, left.useStatsForParallelization());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 6e52cd5..91be356 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -167,7 +167,7 @@ public class TupleProjectionCompiler {
                 table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName> emptyList(),
                 table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
                 table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), 
+                table.getIndexType(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), 
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
@@ -198,7 +198,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, null, null,
                 Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
-                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(),
+                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.getTransactionProvider(),
                 table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter, table.useStatsForParallelization());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index d5bfef8..9ca92f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -35,12 +35,12 @@ import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -100,7 +100,7 @@ public class UnionCompiler {
             UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null,
             HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn,
             null, null, projectedColumns, null, null, null, true, null, null, null, true,
-            true, true, null, null, null, false, false, 0, 0L,
+            true, true, null, null, null, false, null, 0, 0L,
             SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY,
                 statement.getConnection().getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index b89e573..ab742b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -1020,8 +1020,8 @@ public class UpsertCompiler {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
             }
             ResultIterator iterator = aggPlan.iterator();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 3f98032..725e792 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -41,10 +41,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
-import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -101,6 +99,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
     public final static String QUALIFIER_ENCODING_SCHEME = "_QualifierEncodingScheme";
     public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme";
     public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList";
+    public static final String CLIENT_VERSION = "_ClientVersion";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index f09f6ee..36e1ca1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -69,6 +69,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
@@ -229,6 +230,7 @@ import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -274,29 +276,30 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
     public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE";
     public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE);
     // KeyValues for Table
-    private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
-    private static final Cell TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
-    private static final Cell COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
-    private static final Cell SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
-    private static final Cell PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
-    private static final Cell DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
-    private static final Cell INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
-    private static final Cell IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
-    private static final Cell VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
-    private static final Cell DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
-    private static final Cell DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
-    private static final Cell MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
-    private static final Cell VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
-    private static final Cell VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
-    private static final Cell INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
-    private static final Cell INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
-    private static final Cell STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
-    private static final Cell EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
-    private static final Cell BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
-    private static final Cell ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
-    private static final Cell TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
-    private static final Cell UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
-    private static final Cell IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+    private static final KeyValue TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
+    private static final KeyValue TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
+    private static final KeyValue COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
+    private static final KeyValue SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
+    private static final KeyValue PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
+    private static final KeyValue DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
+    private static final KeyValue INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
+    private static final KeyValue IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
+    private static final KeyValue VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
+    private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
+    private static final KeyValue DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
+    private static final KeyValue MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
+    private static final KeyValue VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
+    private static final KeyValue VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
+    private static final KeyValue INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
+    private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+    private static final KeyValue STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
+    private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+    private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
+    private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+    private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
+    private static final KeyValue TRANSACTION_PROVIDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTION_PROVIDER_BYTES);
+    private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
+    private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
     private static final Cell AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
     private static final Cell APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
@@ -326,6 +329,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
             BASE_COLUMN_COUNT_KV,
             ROW_KEY_ORDER_OPTIMIZABLE_KV,
             TRANSACTIONAL_KV,
+            TRANSACTION_PROVIDER_KV,
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
@@ -357,6 +361,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
     private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
     private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
     private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
+    private static final int TRANSACTION_PROVIDER_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTION_PROVIDER_KV);
     private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
     private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
     private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
@@ -950,7 +955,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
         Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
         boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
         Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
-        boolean transactional = transactionalKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
+        Cell transactionProviderKv = tableKeyValues[TRANSACTION_PROVIDER_INDEX];
+        TransactionFactory.Provider transactionProvider = null;
+        if (transactionProviderKv == null) {
+            if (transactionalKv != null && Boolean.TRUE.equals(
+                    PBoolean.INSTANCE.toObject(
+                            transactionalKv.getValueArray(), 
+                            transactionalKv.getValueOffset(), 
+                            transactionalKv.getValueLength()))) {
+                // For backward compat, prior to client setting TRANSACTION_PROVIDER
+                transactionProvider = TransactionFactory.Provider.TEPHRA;
+            }
+        } else {
+            transactionProvider = TransactionFactory.Provider.fromCode(
+                    PTinyint.INSTANCE.getCodec().decodeByte(
+                            transactionProviderKv.getValueArray(),
+                            transactionProviderKv.getValueOffset(), 
+                            SortOrder.getDefault()));
+        }
         Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
         ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
@@ -1033,7 +1055,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
                 pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
-                rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
+                rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, baseColumnCount,
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter, useStatsForParallelization);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 9de9fba..906f58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -95,9 +95,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
     // Since there's no upgrade code, keep the version the same as the previous version
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
     // TODO Was there a system table upgrade?
     // TODO Need to account for the inevitable 4.14 release too
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_0_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_0_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_0_0;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 116b160..83da368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -85,6 +84,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
@@ -486,7 +486,7 @@ public class MetaDataRegionObserver implements RegionObserver,RegionCoprocessor
 									conn);
 							byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
 							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-							dataTableScan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+							ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
                             LOG.info("Starting to partially build indexes:" + indexesToPartiallyRebuild
                                     + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:"
                                     + earliestDisableTimestamp + " till "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
deleted file mode 100644
index 8e7f0f9..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.coprocessor;
-
-import java.io.IOException;
-import java.util.Optional;
-
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.phoenix.transaction.TransactionFactory;
-
-public class PhoenixTransactionalProcessor extends DelegateRegionObserver implements RegionCoprocessor {
-
-    @Override
-    public Optional<RegionObserver> getRegionObserver() {
-        return Optional.of(this);
-    }
-
-    public PhoenixTransactionalProcessor() {
-        super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor());
-    }
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-        if (delegate instanceof RegionCoprocessor) {
-            ((RegionCoprocessor)delegate).start(env);
-        }
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-        if (delegate instanceof RegionCoprocessor) {
-            ((RegionCoprocessor)delegate).stop(env);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index b586d23..46b847b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
@@ -37,6 +36,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.protobuf.RpcCallback;
@@ -80,7 +80,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements R
           ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
           tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
               cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(),
-              request.hasClientVersion() ? request.getClientVersion() : IndexMetaDataCache.UNKNOWN_CLIENT_VERSION);
+              request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
         } catch (Throwable e) {
             ProtobufUtil.setControllerException(controller,
                 ServerUtil.createIOException("Error when adding cache: ", e));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
new file mode 100644
index 0000000..712904a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+public class TephraTransactionalProcessor extends DelegateRegionObserver implements RegionCoprocessor {
+
+    public TephraTransactionalProcessor() {
+        super(new TransactionProcessor());
+    }
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+        return Optional.of(this);
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        if (delegate instanceof RegionCoprocessor) {
+            ((RegionCoprocessor)delegate).start(env);
+        }
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        if (delegate instanceof RegionCoprocessor) {
+            ((RegionCoprocessor)delegate).stop(env);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 37d2b4d..3fe1def 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -279,7 +279,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
            }
            if (clientVersionBytes != null) {
-               m.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+               m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
            }
         }
     }
@@ -526,7 +526,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
 
-        byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean acquiredLock = false;
         boolean incrScanRefCount = false;
         final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
@@ -985,7 +985,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
         try {
@@ -1010,7 +1010,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
-                                    put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(put);
                                     // Since we're replaying existing mutations, it makes no sense to write them to the wal
                                     put.setDurability(Durability.SKIP_WAL);
@@ -1022,7 +1022,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
-                                    del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(del);
                                     // Since we're replaying existing mutations, it makes no sense to write them to the wal
                                     del.setDurability(Durability.SKIP_WAL);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 278f301..8d500e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3520,6 +3520,16 @@ public final class PTableProtos {
      * <code>optional bool useStatsForParallelization = 37;</code>
      */
     boolean getUseStatsForParallelization();
+
+    // optional int32 transactionProvider = 38;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    boolean hasTransactionProvider();
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    int getTransactionProvider();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3771,6 +3781,11 @@ public final class PTableProtos {
               useStatsForParallelization_ = input.readBool();
               break;
             }
+            case 304: {
+              bitField1_ |= 0x00000001;
+              transactionProvider_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3823,6 +3838,7 @@ public final class PTableProtos {
     }
 
     private int bitField0_;
+    private int bitField1_;
     // required bytes schemaNameBytes = 1;
     public static final int SCHEMANAMEBYTES_FIELD_NUMBER = 1;
     private com.google.protobuf.ByteString schemaNameBytes_;
@@ -4534,6 +4550,22 @@ public final class PTableProtos {
       return useStatsForParallelization_;
     }
 
+    // optional int32 transactionProvider = 38;
+    public static final int TRANSACTIONPROVIDER_FIELD_NUMBER = 38;
+    private int transactionProvider_;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public boolean hasTransactionProvider() {
+      return ((bitField1_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public int getTransactionProvider() {
+      return transactionProvider_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4571,6 +4603,7 @@ public final class PTableProtos {
       encodingScheme_ = com.google.protobuf.ByteString.EMPTY;
       encodedCQCounters_ = java.util.Collections.emptyList();
       useStatsForParallelization_ = false;
+      transactionProvider_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4746,6 +4779,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x80000000) == 0x80000000)) {
         output.writeBool(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(38, transactionProvider_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4904,6 +4940,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(38, transactionProvider_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5095,6 +5135,11 @@ public final class PTableProtos {
         result = result && (getUseStatsForParallelization()
             == other.getUseStatsForParallelization());
       }
+      result = result && (hasTransactionProvider() == other.hasTransactionProvider());
+      if (hasTransactionProvider()) {
+        result = result && (getTransactionProvider()
+            == other.getTransactionProvider());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5252,6 +5297,10 @@ public final class PTableProtos {
         hash = (37 * hash) + USESTATSFORPARALLELIZATION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getUseStatsForParallelization());
       }
+      if (hasTransactionProvider()) {
+        hash = (37 * hash) + TRANSACTIONPROVIDER_FIELD_NUMBER;
+        hash = (53 * hash) + getTransactionProvider();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5448,6 +5497,8 @@ public final class PTableProtos {
         }
         useStatsForParallelization_ = false;
         bitField1_ = (bitField1_ & ~0x00000008);
+        transactionProvider_ = 0;
+        bitField1_ = (bitField1_ & ~0x00000010);
         return this;
       }
 
@@ -5477,6 +5528,7 @@ public final class PTableProtos {
         int from_bitField0_ = bitField0_;
         int from_bitField1_ = bitField1_;
         int to_bitField0_ = 0;
+        int to_bitField1_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
@@ -5637,7 +5689,12 @@ public final class PTableProtos {
           to_bitField0_ |= 0x80000000;
         }
         result.useStatsForParallelization_ = useStatsForParallelization_;
+        if (((from_bitField1_ & 0x00000010) == 0x00000010)) {
+          to_bitField1_ |= 0x00000001;
+        }
+        result.transactionProvider_ = transactionProvider_;
         result.bitField0_ = to_bitField0_;
+        result.bitField1_ = to_bitField1_;
         onBuilt();
         return result;
       }
@@ -5841,6 +5898,9 @@ public final class PTableProtos {
         if (other.hasUseStatsForParallelization()) {
           setUseStatsForParallelization(other.getUseStatsForParallelization());
         }
+        if (other.hasTransactionProvider()) {
+          setTransactionProvider(other.getTransactionProvider());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7920,6 +7980,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 transactionProvider = 38;
+      private int transactionProvider_ ;
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public boolean hasTransactionProvider() {
+        return ((bitField1_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public int getTransactionProvider() {
+        return transactionProvider_;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder setTransactionProvider(int value) {
+        bitField1_ |= 0x00000010;
+        transactionProvider_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder clearTransactionProvider() {
+        bitField1_ = (bitField1_ & ~0x00000010);
+        transactionProvider_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -8587,7 +8680,7 @@ public final class PTableProtos {
       "\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030",
       "\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n\rke" +
       "yBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001" +
-      "(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts\"\220\007" +
+      "(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts\"\255\007" +
       "\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tab" +
       "leNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.P" +
       "TableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenc" +
@@ -8610,12 +8703,13 @@ public final class PTableProtos {
       "\n\017parentNameBytes\030! \001(\014\022\025\n\rstorageScheme" +
       "\030\" \001(\014\022\026\n\016encodingScheme\030# \001(\014\022,\n\021encode" +
       "dCQCounters\030$ \003(\0132\021.EncodedCQCounter\022\"\n\032" +
-      "useStatsForParallelization\030% \001(\010\"6\n\020Enco" +
-      "dedCQCounter\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007count" +
-      "er\030\002 \002(\005*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004US" +
-      "ER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(o" +
-      "rg.apache.phoenix.coprocessor.generatedB",
-      "\014PTableProtosH\001\210\001\001\240\001\001"
+      "useStatsForParallelization\030% \001(\010\022\033\n\023tran" +
+      "sactionProvider\030& \001(\005\"6\n\020EncodedCQCounte" +
+      "r\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A\n" +
+      "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" +
+      "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p",
+      "hoenix.coprocessor.generatedB\014PTableProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8639,7 +8733,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", "AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", "StorageScheme", "EncodingScheme", "EncodedCQCounters", "UseStatsForParallelization", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", "AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", "StorageScheme", "EncodingScheme", "EncodedCQCounters", "UseStatsForParallelization", "TransactionProvider", });
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 61e1307..bde5083 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -294,6 +294,10 @@ public enum SQLExceptionCode {
     SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN(1086, "44A17", "Sequence Value not castable to auto-partition id column"),
     CANNOT_COERCE_AUTO_PARTITION_ID(1087, "44A18", "Auto-partition id cannot be coerced"),
     CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP(1088, "44A19", "Cannot create an index on a mutable table that has a ROW_TIMESTAMP column."),
+    UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: "),
+    CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction if transactions are disabled."),
+    CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: "),
+    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for "),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index b152030..8a8c822 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -252,6 +253,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
             return newIterator(scanGrouper, scan, caches);
         }
         
+        ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+        
         // Set miscellaneous scan attributes. This is the last chance to set them before we
         // clone the scan for each parallelized chunk.
         TableRef tableRef = context.getCurrentTable();