You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/04/13 00:13:46 UTC

[4/4] phoenix git commit: PHOENIX-4605 Support running multiple transaction providers

PHOENIX-4605 Support running multiple transaction providers


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7f37966c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7f37966c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7f37966c

Branch: refs/heads/4.x-HBase-1.3
Commit: 7f37966c19c0011f60c2d9b4e6889d5da27a901a
Parents: 73aca1e
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Apr 11 20:06:35 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 12 12:40:05 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AlterTableWithViewsIT.java  |   6 +-
 .../ConnectionQueryServicesTestImpl.java        |  34 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  11 +-
 .../phoenix/tx/ParameterizedTransactionIT.java  |  14 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  12 +
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |   5 +-
 .../phoenix/cache/IndexMetaDataCache.java       |   5 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   3 +-
 .../apache/phoenix/compile/FromCompiler.java    |   2 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   6 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   4 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  26 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   2 +-
 .../coprocessor/MetaDataRegionObserver.java     |   4 +-
 .../PhoenixTransactionalProcessor.java          |  28 --
 .../coprocessor/ServerCachingEndpointImpl.java  |   4 +-
 .../TephraTransactionalProcessor.java           |  29 ++
 .../UngroupedAggregateRegionObserver.java       |  10 +-
 .../coprocessor/generated/PTableProtos.java     | 110 +++++-
 .../phoenix/exception/SQLExceptionCode.java     |   4 +
 .../apache/phoenix/execute/BaseQueryPlan.java   |   3 +
 .../apache/phoenix/execute/MutationState.java   |  74 ++--
 .../PhoenixTxIndexMutationGenerator.java        |  10 +-
 .../phoenix/expression/ExpressionType.java      | 119 +-----
 .../TransactionProviderNameFunction.java        |  81 +++++
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../index/IndexMetaDataCacheFactory.java        |   2 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |   1 -
 .../index/PhoenixIndexMetaDataBuilder.java      |   7 +-
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  12 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 .../index/PhoenixIndexPartialBuildMapper.java   |   4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   4 +
 .../query/ConnectionQueryServicesImpl.java      |  69 ++--
 .../query/ConnectionlessQueryServicesImpl.java  |   9 +-
 .../query/DelegateConnectionQueryServices.java  |   7 +-
 .../apache/phoenix/query/QueryConstants.java    |   2 +
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |  10 +-
 .../apache/phoenix/schema/DelegateTable.java    |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 129 +++++--
 .../java/org/apache/phoenix/schema/PTable.java  |   2 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  68 ++--
 .../apache/phoenix/schema/TableProperty.java    |  18 +
 .../transaction/OmidTransactionContext.java     |  57 +--
 .../transaction/OmidTransactionProvider.java    |  54 +--
 .../transaction/OmidTransactionTable.java       | 364 -------------------
 .../transaction/PhoenixTransactionClient.java   |  23 ++
 .../transaction/PhoenixTransactionContext.java  | 169 +++++----
 .../transaction/PhoenixTransactionProvider.java |  51 +++
 .../transaction/PhoenixTransactionService.java  |  24 ++
 .../transaction/PhoenixTransactionalTable.java  | 149 --------
 .../transaction/TephraTransactionContext.java   | 205 +++--------
 .../transaction/TephraTransactionProvider.java  | 161 +++++++-
 .../transaction/TephraTransactionTable.java     | 350 ------------------
 .../phoenix/transaction/TransactionFactory.java |  57 ++-
 .../transaction/TransactionProvider.java        |  36 --
 .../org/apache/phoenix/util/PhoenixRuntime.java |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  15 +
 .../apache/phoenix/util/TransactionUtil.java    |  49 ++-
 .../phoenix/execute/CorrelatePlanTest.java      |   5 +-
 .../execute/LiteralResultIteratorPlanTest.java  |   5 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  26 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  15 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 70 files changed, 1176 insertions(+), 1623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index 6b57148..237a8d2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -36,7 +36,7 @@ import java.util.Collection;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
@@ -748,7 +748,7 @@ public class AlterTableWithViewsIT extends ParallelStatsDisabledIT {
             PName tenantId = isMultiTenant ? PNameFactory.newName("tenant1") : null;
             PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
             HTableInterface htable = phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertFalse(phoenixConn.getTable(new PTableKey(null, baseTableName)).isTransactional());
             assertFalse(viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, viewOfTable)).isTransactional());
             
@@ -757,7 +757,7 @@ public class AlterTableWithViewsIT extends ParallelStatsDisabledIT {
             // query the view to force the table cache to be updated
             viewConn.createStatement().execute("SELECT * FROM " + viewOfTable);
             htable = phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertTrue(phoenixConn.getTable(new PTableKey(null, baseTableName)).isTransactional());
             assertTrue(viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, viewOfTable)).isTransactional());
         } 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index a1ad1ad..3db93b0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +30,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.PhoenixTransactionService;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.SQLCloseables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
@@ -42,9 +49,11 @@ import com.google.common.collect.Sets;
  * @since 0.1
  */
 public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl {
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesTestImpl.class);
     protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster
     // Track open connections to free them on close as unit tests don't always do this.
     private Set<PhoenixConnection> connections = Sets.newHashSet();
+    private final PhoenixTransactionService[] txServices = new PhoenixTransactionService[TransactionFactory.Provider.values().length];
     
     public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties props) throws SQLException {
         super(services, info, props);
@@ -68,12 +77,33 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
                 // Make copy to prevent ConcurrentModificationException (TODO: figure out why this is necessary)
                 connections = new ArrayList<>(this.connections);
                 this.connections = Sets.newHashSet();
+                
+                // shut down the tx client service if we created one to support transactions
+                for (PhoenixTransactionService service : txServices) {
+                    if (service != null) {
+                        try {
+                            service.close();
+                        } catch (IOException e) {
+                            logger.warn(e.getMessage(), e);
+                        }
+                    }
+                }
+
             }
             SQLCloseables.closeAll(connections);
-             long unfreedBytes = clearCache();
-             assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes);
+            long unfreedBytes = clearCache();
+            assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes);
         } finally {
             super.close();
         }
     }
+    
+    @Override
+    public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) {
+        PhoenixTransactionService txService = txServices[provider.ordinal()];
+        if (txService == null) {
+            txService = txServices[provider.ordinal()] = provider.getTransactionProvider().getTransactionService(config, connectionInfo);
+        }
+        return super.initTransactionClient(provider);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 200cf1c..0323c32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -43,7 +43,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -225,9 +224,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         }
 
         PhoenixTransactionContext txContext =
-              TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
-        PhoenixTransactionalTable txTable =
-              TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
+              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
+        HTableInterface txTable =
+                txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
 
@@ -277,9 +276,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         
         // Repeat the same as above, but this time abort the transaction
         txContext =
-              TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
+              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
         txTable =
-              TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
+              txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index fecfd9a..5421801 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 package org.apache.phoenix.tx;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -280,9 +280,9 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true");
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName));
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index));
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
 
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')");
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL");
@@ -357,7 +357,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName));
-        assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
                 getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -375,7 +375,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         PTable table = pconn.getTable(new PTableKey(null, t1));
         HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         
         try {
             ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -409,7 +409,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         table = pconn.getTable(new PTableKey(null, t1));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 9286c2e..f1344e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -44,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -129,6 +131,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals("Transactional table was not marked as transactional in JDBC API.",
                 "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            assertEquals(TransactionFactory.Provider.TEPHRA.name(), rs.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
 
             String nonTransactTableName = generateUniqueName();
             Statement stmt2 = conn.createStatement();
@@ -139,6 +142,15 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             assertTrue(rs2.next());
             assertEquals("Non-transactional table was marked as transactional in JDBC API.",
                 "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            assertNull(rs2.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
+            
+            try {
+                stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " +
+                        "TRANSACTION_PROVIDER=foo");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER.getErrorCode(), e.getErrorCode());
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index bf39dfe..42e2102 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Test;
@@ -265,7 +266,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		ResultSet rs;
 		MutationState state = conn.unwrap(PhoenixConnection.class)
 				.getMutationState();
-		state.startTransaction();
+		state.startTransaction(TransactionFactory.Provider.TEPHRA);
 		long wp = state.getWritePointer();
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
@@ -329,7 +330,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 			conn.commit();
 
 	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-	        state.startTransaction();
+	        state.startTransaction(TransactionFactory.Provider.TEPHRA);
 	        long wp = state.getWritePointer();
 	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
 	        assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 17e6fb6..9f3dd59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.util.ScanUtil;
 
 public interface IndexMetaDataCache extends Closeable {
-    public static int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0);
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() {
 
         @Override
@@ -47,7 +46,7 @@ public interface IndexMetaDataCache extends Closeable {
         
         @Override
         public int getClientVersion() {
-            return UNKNOWN_CLIENT_VERSION;
+            return ScanUtil.UNKNOWN_CLIENT_VERSION;
         }
         
     };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 7985314..5f9c76c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -783,8 +782,8 @@ public class DeleteCompiler {
                     byte[] uuidValue = ServerCacheClient.generateId();
                     context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                     context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                    context.getScan().setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                     context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                    ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
                 }
                 ResultIterator iterator = aggPlan.iterator();
                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index b5293bb..3faada7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -821,7 +821,7 @@ public class FromCompiler {
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null,
                     MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
                     Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false,
-                    false, null, null, null, false, false, 0, 0L, SchemaUtil
+                    false, null, null, null, false, null, 0, 0L, SchemaUtil
                             .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
 
             String alias = subselectNode.getAlias();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 88e8f50..824d933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1276,7 +1276,7 @@ public class JoinCompiler {
                 left.getBucketNum(), merged, left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName> emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(),
-                left.getIndexType(), left.rowKeyOrderOptimizable(), left.isTransactional(),
+                left.getIndexType(), left.rowKeyOrderOptimizable(), left.getTransactionProvider(),
                 left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), 
                 left.getAutoPartitionSeqName(), left.isAppendOnlySchema(), ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, left.useStatsForParallelization());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 6e52cd5..91be356 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -167,7 +167,7 @@ public class TupleProjectionCompiler {
                 table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName> emptyList(),
                 table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
                 table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), 
+                table.getIndexType(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), 
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
@@ -198,7 +198,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, null, null,
                 Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
-                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(),
+                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.getTransactionProvider(),
                 table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter, table.useStatsForParallelization());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index d5bfef8..9ca92f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -35,12 +35,12 @@ import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -100,7 +100,7 @@ public class UnionCompiler {
             UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null,
             HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn,
             null, null, projectedColumns, null, null, null, true, null, null, null, true,
-            true, true, null, null, null, false, false, 0, 0L,
+            true, true, null, null, null, false, null, 0, 0L,
             SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY,
                 statement.getConnection().getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 1a9a686..22119a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -1020,8 +1020,8 @@ public class UpsertCompiler {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
             }
             ResultIterator iterator = aggPlan.iterator();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 110a4ed..60cc150 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -49,10 +48,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
-import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
@@ -110,6 +107,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public final static String QUALIFIER_ENCODING_SCHEME = "_QualifierEncodingScheme";
     public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme";
     public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList";
+    public static final String CLIENT_VERSION = "_ClientVersion";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 05ad959..a2d008b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -69,6 +69,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
@@ -228,6 +229,7 @@ import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -295,6 +297,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
     private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
     private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
+    private static final KeyValue TRANSACTION_PROVIDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTION_PROVIDER_BYTES);
     private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
     private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
@@ -326,6 +329,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             BASE_COLUMN_COUNT_KV,
             ROW_KEY_ORDER_OPTIMIZABLE_KV,
             TRANSACTIONAL_KV,
+            TRANSACTION_PROVIDER_KV,
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
@@ -357,6 +361,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
     private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
     private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
+    private static final int TRANSACTION_PROVIDER_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTION_PROVIDER_KV);
     private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
     private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
     private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
@@ -961,7 +966,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
         boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
         Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
-        boolean transactional = transactionalKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
+        Cell transactionProviderKv = tableKeyValues[TRANSACTION_PROVIDER_INDEX];
+        TransactionFactory.Provider transactionProvider = null;
+        if (transactionProviderKv == null) {
+            if (transactionalKv != null && Boolean.TRUE.equals(
+                    PBoolean.INSTANCE.toObject(
+                            transactionalKv.getValueArray(), 
+                            transactionalKv.getValueOffset(), 
+                            transactionalKv.getValueLength()))) {
+                // For backward compat, prior to client setting TRANSACTION_PROVIDER
+                transactionProvider = TransactionFactory.Provider.TEPHRA;
+            }
+        } else {
+            transactionProvider = TransactionFactory.Provider.fromCode(
+                    PTinyint.INSTANCE.getCodec().decodeByte(
+                            transactionProviderKv.getValueArray(),
+                            transactionProviderKv.getValueOffset(), 
+                            SortOrder.getDefault()));
+        }
         Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
         ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
@@ -1044,7 +1066,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
                 pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
-                rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
+                rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, baseColumnCount,
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter, useStatsForParallelization);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 4c4c96f..a71ce0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -95,7 +95,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     // Since there's no upgrade code, keep the version the same as the previous version
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 393a2f9..4968525 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -83,6 +82,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
@@ -496,7 +496,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 									conn);
 							byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
 							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-							dataTableScan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+							ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
                             LOG.info("Starting to partially build indexes:" + indexesToPartiallyRebuild
                                     + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:"
                                     + earliestDisableTimestamp + " till "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
deleted file mode 100644
index 0c26ecc..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.coprocessor;
-
-import org.apache.phoenix.transaction.TransactionFactory;
-
-public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
-
-    public PhoenixTransactionalProcessor() {
-        super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 448f61c..9d78659 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
@@ -37,6 +36,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.protobuf.RpcCallback;
@@ -75,7 +75,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
           ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
           tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
               cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(),
-              request.hasClientVersion() ? request.getClientVersion() : IndexMetaDataCache.UNKNOWN_CLIENT_VERSION);
+              request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION);
         } catch (Throwable e) {
             ProtobufUtil.setControllerException(controller,
                 ServerUtil.createIOException("Error when adding cache: ", e));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
new file mode 100644
index 0000000..a702bc4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+
+public class TephraTransactionalProcessor extends DelegateRegionObserver {
+
+    public TephraTransactionalProcessor() {
+        super(new TransactionProcessor());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index de57772..31b512a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -277,7 +277,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
            }
            if (clientVersionBytes != null) {
-               m.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+               m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
            }
         }
     }
@@ -523,7 +523,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
 
-        byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean acquiredLock = false;
         boolean incrScanRefCount = false;
         final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
@@ -1023,7 +1023,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
         try {
@@ -1048,7 +1048,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
-                                    put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(put);
                                     // Since we're replaying existing mutations, it makes no sense to write them to the wal
                                     put.setDurability(Durability.SKIP_WAL);
@@ -1060,7 +1060,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
-                                    del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(del);
                                     // Since we're replaying existing mutations, it makes no sense to write them to the wal
                                     del.setDurability(Durability.SKIP_WAL);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 278f301..8d500e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3520,6 +3520,16 @@ public final class PTableProtos {
      * <code>optional bool useStatsForParallelization = 37;</code>
      */
     boolean getUseStatsForParallelization();
+
+    // optional int32 transactionProvider = 38;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    boolean hasTransactionProvider();
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    int getTransactionProvider();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3771,6 +3781,11 @@ public final class PTableProtos {
               useStatsForParallelization_ = input.readBool();
               break;
             }
+            case 304: {
+              bitField1_ |= 0x00000001;
+              transactionProvider_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3823,6 +3838,7 @@ public final class PTableProtos {
     }
 
     private int bitField0_;
+    private int bitField1_;
     // required bytes schemaNameBytes = 1;
     public static final int SCHEMANAMEBYTES_FIELD_NUMBER = 1;
     private com.google.protobuf.ByteString schemaNameBytes_;
@@ -4534,6 +4550,22 @@ public final class PTableProtos {
       return useStatsForParallelization_;
     }
 
+    // optional int32 transactionProvider = 38;
+    public static final int TRANSACTIONPROVIDER_FIELD_NUMBER = 38;
+    private int transactionProvider_;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public boolean hasTransactionProvider() {
+      return ((bitField1_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public int getTransactionProvider() {
+      return transactionProvider_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4571,6 +4603,7 @@ public final class PTableProtos {
       encodingScheme_ = com.google.protobuf.ByteString.EMPTY;
       encodedCQCounters_ = java.util.Collections.emptyList();
       useStatsForParallelization_ = false;
+      transactionProvider_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4746,6 +4779,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x80000000) == 0x80000000)) {
         output.writeBool(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(38, transactionProvider_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4904,6 +4940,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(38, transactionProvider_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5095,6 +5135,11 @@ public final class PTableProtos {
         result = result && (getUseStatsForParallelization()
             == other.getUseStatsForParallelization());
       }
+      result = result && (hasTransactionProvider() == other.hasTransactionProvider());
+      if (hasTransactionProvider()) {
+        result = result && (getTransactionProvider()
+            == other.getTransactionProvider());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5252,6 +5297,10 @@ public final class PTableProtos {
         hash = (37 * hash) + USESTATSFORPARALLELIZATION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getUseStatsForParallelization());
       }
+      if (hasTransactionProvider()) {
+        hash = (37 * hash) + TRANSACTIONPROVIDER_FIELD_NUMBER;
+        hash = (53 * hash) + getTransactionProvider();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5448,6 +5497,8 @@ public final class PTableProtos {
         }
         useStatsForParallelization_ = false;
         bitField1_ = (bitField1_ & ~0x00000008);
+        transactionProvider_ = 0;
+        bitField1_ = (bitField1_ & ~0x00000010);
         return this;
       }
 
@@ -5477,6 +5528,7 @@ public final class PTableProtos {
         int from_bitField0_ = bitField0_;
         int from_bitField1_ = bitField1_;
         int to_bitField0_ = 0;
+        int to_bitField1_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
@@ -5637,7 +5689,12 @@ public final class PTableProtos {
           to_bitField0_ |= 0x80000000;
         }
         result.useStatsForParallelization_ = useStatsForParallelization_;
+        if (((from_bitField1_ & 0x00000010) == 0x00000010)) {
+          to_bitField1_ |= 0x00000001;
+        }
+        result.transactionProvider_ = transactionProvider_;
         result.bitField0_ = to_bitField0_;
+        result.bitField1_ = to_bitField1_;
         onBuilt();
         return result;
       }
@@ -5841,6 +5898,9 @@ public final class PTableProtos {
         if (other.hasUseStatsForParallelization()) {
           setUseStatsForParallelization(other.getUseStatsForParallelization());
         }
+        if (other.hasTransactionProvider()) {
+          setTransactionProvider(other.getTransactionProvider());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7920,6 +7980,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 transactionProvider = 38;
+      private int transactionProvider_ ;
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public boolean hasTransactionProvider() {
+        return ((bitField1_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public int getTransactionProvider() {
+        return transactionProvider_;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder setTransactionProvider(int value) {
+        bitField1_ |= 0x00000010;
+        transactionProvider_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder clearTransactionProvider() {
+        bitField1_ = (bitField1_ & ~0x00000010);
+        transactionProvider_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -8587,7 +8680,7 @@ public final class PTableProtos {
       "\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030",
       "\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n\rke" +
       "yBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001" +
-      "(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts\"\220\007" +
+      "(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts\"\255\007" +
       "\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tab" +
       "leNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.P" +
       "TableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenc" +
@@ -8610,12 +8703,13 @@ public final class PTableProtos {
       "\n\017parentNameBytes\030! \001(\014\022\025\n\rstorageScheme" +
       "\030\" \001(\014\022\026\n\016encodingScheme\030# \001(\014\022,\n\021encode" +
       "dCQCounters\030$ \003(\0132\021.EncodedCQCounter\022\"\n\032" +
-      "useStatsForParallelization\030% \001(\010\"6\n\020Enco" +
-      "dedCQCounter\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007count" +
-      "er\030\002 \002(\005*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004US" +
-      "ER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(o" +
-      "rg.apache.phoenix.coprocessor.generatedB",
-      "\014PTableProtosH\001\210\001\001\240\001\001"
+      "useStatsForParallelization\030% \001(\010\022\033\n\023tran" +
+      "sactionProvider\030& \001(\005\"6\n\020EncodedCQCounte" +
+      "r\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A\n" +
+      "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" +
+      "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p",
+      "hoenix.coprocessor.generatedB\014PTableProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8639,7 +8733,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", "AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", "StorageScheme", "EncodingScheme", "EncodedCQCounters", "UseStatsForParallelization", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", "AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", "StorageScheme", "EncodingScheme", "EncodedCQCounters", "UseStatsForParallelization", "TransactionProvider", });
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9cbc67e..e9e209b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -294,6 +294,10 @@ public enum SQLExceptionCode {
     SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN(1086, "44A17", "Sequence Value not castable to auto-partition id column"),
     CANNOT_COERCE_AUTO_PARTITION_ID(1087, "44A18", "Auto-partition id cannot be coerced"),
     CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP(1088, "44A19", "Cannot create an index on a mutable table that has a ROW_TIMESTAMP column."),
+    UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: "),
+    CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction if transactions are disabled."),
+    CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: "),
+    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for "),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index b152030..8a8c822 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -252,6 +253,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
             return newIterator(scanGrouper, scan, caches);
         }
         
+        ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+        
         // Set miscellaneous scan attributes. This is the last chance to set them before we
         // clone the scan for each parallelized chunk.
         TableRef tableRef = context.getCurrentTable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 839194a..18f4fea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -72,6 +72,8 @@ import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
 import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
@@ -91,8 +93,8 @@ import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -139,7 +141,7 @@ public class MutationState implements SQLCloseable {
     private boolean isExternalTxContext = false;
     private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
 
-    final PhoenixTransactionContext phoenixTransactionContext;
+    private PhoenixTransactionContext phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
 
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -181,17 +183,13 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        if (!subTask) {
-            if (txContext == null) {
-                phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(connection);
-            } else {
-                isExternalTxContext = true;
-                phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
-            }
-        } else {
+        if (subTask) {
             // this code path is only used while running child scans, we can't pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
+            phoenixTransactionContext = txContext.newTransactionContext(txContext, subTask);
+        } else if (txContext != null) {
+            isExternalTxContext = true;
+            phoenixTransactionContext = txContext.newTransactionContext(txContext, subTask);
         }
     }
 
@@ -234,7 +232,7 @@ public class MutationState implements SQLCloseable {
     public void commitDDLFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
             try {
-                phoenixTransactionContext.commitDDLFence(dataTable, logger);
+                phoenixTransactionContext.commitDDLFence(dataTable);
             } finally {
                 // The client expects a transaction to be in progress on the txContext while the
                 // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's
@@ -300,14 +298,12 @@ public class MutationState implements SQLCloseable {
     // Though MutationState is not thread safe in general, this method should be because it may
     // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose
     // the Transaction outside of MutationState, this seems reasonable, as the member variables
-    // would not change as these threads are running.
+    // would not change as these threads are running. We also clone mutationState to ensure that
+    // the transaction context won't change due to a commit when auto commit is true.
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) {
-            PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table);
-            // Using cloned mutationState as we may have started a new transaction already
-            // if auto commit is true and we need to use the original one here.
-            htable = phoenixTransactionTable;
+            htable = phoenixTransactionContext.getTransactionalTable(htable, table.isImmutableRows());
         }
         return htable;
     }
@@ -334,13 +330,32 @@ public class MutationState implements SQLCloseable {
         return phoenixTransactionContext.getVisibilityLevel();
     }
 
-    public boolean startTransaction() throws SQLException {
+    public boolean startTransaction(Provider provider) throws SQLException {
+        if (provider == null) {
+            return false;
+        }
+        if (!connection.getQueryServices().getProps().getBoolean(
+                QueryServices.TRANSACTIONS_ENABLED,
+                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED)
+                    .build().buildException();
+        }
         if (connection.getSCN() != null) {
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
                     .build().buildException();
         }
 
+        if (phoenixTransactionContext == PhoenixTransactionContext.NULL_CONTEXT) {
+            phoenixTransactionContext = provider.getTransactionProvider().getTransactionContext(connection);
+        } else {
+            if (provider != phoenixTransactionContext.getProvider()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS)
+                        .setMessage(phoenixTransactionContext.getProvider().name() + " and " + provider.name())
+                        .build().buildException();
+            }
+        }
         if (!isTransactionStarted()) {
             // Clear any transactional state in case transaction was ended outside
             // of Phoenix so we don't carry the old transaction state forward. We
@@ -1069,6 +1084,7 @@ public class MutationState implements SQLCloseable {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted data
                             uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                            phoenixTransactionContext.markDMLFence(table);
 
                             // If we have indexes, wrap the HTable in a delegate HTable that
                             // will attach the necessary index meta data in the event of a
@@ -1077,7 +1093,7 @@ public class MutationState implements SQLCloseable {
                                 hTable = new MetaDataAwareHTable(hTable, origTableRef);
                             }
 
-                            hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table);
+                            hTable = phoenixTransactionContext.getTransactionalTable(hTable, table.isImmutableRows());
                         }
                         
                         numMutations = mutationList.size();
@@ -1223,10 +1239,6 @@ public class MutationState implements SQLCloseable {
         return phoenixTransactionContext.encodeTransaction();
     }
 
-    public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException {
-        return TransactionFactory.getTransactionProvider().getTransactionContext(txnBytes);
-    }
-
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
             ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
         PTable table = tableRef.getTable();
@@ -1269,7 +1281,7 @@ public class MutationState implements SQLCloseable {
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-                mutation.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+                mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 if (txState.length > 0) {
                     mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                 }
@@ -1301,7 +1313,7 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
         estimatedSize = 0;
         this.mutations.clear();
-        resetTransactionalState();
+        phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
     }
 
     private void resetTransactionalState() {
@@ -1365,13 +1377,18 @@ public class MutationState implements SQLCloseable {
                         }
                     }
                 } finally {
+                    TransactionFactory.Provider provider = phoenixTransactionContext.getProvider();
                     try {
                         resetState();
                     } finally {
                         if (retryCommit) {
-                            startTransaction();
+                            startTransaction(provider);
                             // Add back read fences
                             Set<TableRef> txTableRefs = txMutations.keySet();
+                            for (TableRef tableRef : txTableRefs) {
+                                PTable dataTable = tableRef.getTable();
+                                phoenixTransactionContext.markDMLFence(dataTable);
+                            }
                             try {
                                 // Only retry if an index was added
                                 retryCommit = shouldResubmitTransaction(txTableRefs);
@@ -1476,9 +1493,12 @@ public class MutationState implements SQLCloseable {
             List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size());
             while (filteredTableRefs.hasNext()) {
                 TableRef tableRef = filteredTableRefs.next();
+                // REVIEW: unclear if we need this given we start transactions when resolving a table
+                if (tableRef.getTable().isTransactional()) {
+                    startTransaction(tableRef.getTable().getTransactionProvider());
+                }
                 strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
             }
-            startTransaction();
             send(strippedAliases.iterator());
             return true;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f37966c/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 7d6154e..6348d6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -31,8 +31,6 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -64,8 +63,6 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -76,9 +73,6 @@ import com.google.common.primitives.Longs;
 
 
 public class PhoenixTxIndexMutationGenerator {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixTxIndexMutationGenerator.class);
-
     private final PhoenixIndexCodec codec;
     private final PhoenixIndexMetaData indexMetaData;
 
@@ -181,7 +175,7 @@ public class PhoenixTxIndexMutationGenerator {
             scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
             ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
             scanRanges.initializeScan(scan);
-            PhoenixTransactionalTable txTable = TransactionFactory.getTransactionProvider().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
+            Table txTable = indexMetaData.getTransactionContext().getTransactionalTable(htable, isImmutable);
             // For rollback, we need to see all versions, including
             // the last committed version as there may be multiple
             // checkpointed versions.