You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2022/02/02 05:14:44 UTC

[phoenix] branch master updated: PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 22f5d22  PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider
22f5d22 is described below

commit 22f5d22bc20df3e419440d6076c98412b4b30ae2
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Jan 27 10:54:35 2022 +0100

    PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider
---
 phoenix-core/pom.xml                               |  10 +-
 .../end2end/ConnectionQueryServicesTestImpl.java   |   3 +-
 .../phoenix/tx/ParameterizedTransactionIT.java     |  10 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  38 ++--
 .../NotAvailableTransactionProvider.java           |  14 +-
 .../transaction/OmidTransactionProvider.java       |  97 +---------
 .../transaction/PhoenixTransactionProvider.java    |   6 +-
 .../transaction/TephraTransactionProvider.java     |  68 +------
 .../phoenix/transaction/TransactionFactory.java    |   1 -
 .../transaction/TestTransactionServiceManager.java | 207 +++++++++++++++++++++
 10 files changed, 257 insertions(+), 197 deletions(-)

diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index d022973..5167399 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -385,10 +385,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.omid</groupId>
-      <artifactId>omid-tso-server-hbase2.x</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.omid</groupId>
       <artifactId>omid-hbase-common-hbase2.x</artifactId>
     </dependency>
     <dependency>
@@ -416,6 +412,12 @@
     <dependency>
       <groupId>org.apache.omid</groupId>
       <artifactId>omid-tso-server-hbase2.x</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.omid</groupId>
+      <artifactId>omid-tso-server-hbase2.x</artifactId>
+      <scope>test</scope>
       <type>test-jar</type>
     </dependency>
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index da88f3a..7f436ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.transaction.PhoenixTransactionClient;
 import org.apache.phoenix.transaction.PhoenixTransactionService;
+import org.apache.phoenix.transaction.TestTransactionServiceManager;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.SQLCloseables;
@@ -105,7 +106,7 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
         PhoenixTransactionService txService = txServices[provider.ordinal()];
         if (txService == null) {
             int port = TestUtil.getRandomPort();
-            txService = txServices[provider.ordinal()] = provider.getTransactionProvider().getTransactionService(config, connectionInfo, port);
+            txService = txServices[provider.ordinal()] = TestTransactionServiceManager.startTransactionService(provider, config, connectionInfo, port);
         }
         return super.initTransactionClient(provider);
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index 93e55f9..29aa9ef 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -63,7 +62,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -392,8 +390,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName));
-        Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
-        assertFalse(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+        assertFalse(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
                 getColumnFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -416,8 +413,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         PTable table = pconn.getTable(new PTableKey(null, t1));
         Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
         
         try {
             ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -461,7 +457,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         table = pconn.getTable(new PTableKey(null, t1));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
     }
 
     @Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 27fe2ed..f340deb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -105,7 +105,6 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -151,7 +150,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
@@ -1150,26 +1148,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
 
             if (isTransactional) {
-                Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
-                if (!newDesc.hasCoprocessor(coprocessorClass.getName())) {
-                    builder.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null);
-                }
-                Class<? extends RegionObserver> coprocessorGCClass = provider.getTransactionProvider().getGCCoprocessor();
-                if (coprocessorGCClass != null) {
-                    if (!newDesc.hasCoprocessor(coprocessorGCClass.getName())) {
-                        builder.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, null);
+                String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+                if (!newDesc.hasCoprocessor(coprocessorClassName)) {
+                    builder.addCoprocessor(coprocessorClassName, null, priority - 10, null);
+                }
+                String coprocessorGCClassName = provider.getTransactionProvider().getGCCoprocessorClassName();
+                if (coprocessorGCClassName != null) {
+                    if (!newDesc.hasCoprocessor(coprocessorGCClassName)) {
+                        builder.addCoprocessor(coprocessorGCClassName, null, priority - 10, null);
                     }
                 }
             } else {
                 // Remove all potential transactional coprocessors
                 for (TransactionFactory.Provider aprovider : TransactionFactory.Provider.available()) {
-                    Class<? extends RegionObserver> coprocessorClass = aprovider.getTransactionProvider().getCoprocessor();
-                    Class<? extends RegionObserver> coprocessorGCClass = aprovider.getTransactionProvider().getGCCoprocessor();
-                    if (coprocessorClass != null && newDesc.hasCoprocessor(coprocessorClass.getName())) {
-                        builder.removeCoprocessor(coprocessorClass.getName());
+                    String coprocessorClassName = aprovider.getTransactionProvider().getCoprocessorClassName();
+                    String coprocessorGCClassName = aprovider.getTransactionProvider().getGCCoprocessorClassName();
+                    if (coprocessorClassName != null && newDesc.hasCoprocessor(coprocessorClassName)) {
+                        builder.removeCoprocessor(coprocessorClassName);
                     }
-                    if (coprocessorGCClass != null && newDesc.hasCoprocessor(coprocessorGCClass.getName())) {
-                        builder.removeCoprocessor(coprocessorGCClass.getName());
+                    if (coprocessorGCClassName != null && newDesc.hasCoprocessor(coprocessorGCClassName)) {
+                        builder.removeCoprocessor(coprocessorGCClassName);
                     }
                 }
             }
@@ -1610,8 +1608,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     private static boolean hasTxCoprocessor(TableDescriptor descriptor) {
         for (TransactionFactory.Provider provider : TransactionFactory.Provider.available()) {
-            Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
-            if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) {
+            String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+            if (coprocessorClassName != null && descriptor.hasCoprocessor(coprocessorClassName)) {
                 return true;
             }
         }
@@ -1619,8 +1617,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     private static boolean equalTxCoprocessor(TransactionFactory.Provider provider, TableDescriptor existingDesc, TableDescriptor newDesc) {
-        Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
-        return (coprocessorClass != null && existingDesc.hasCoprocessor(coprocessorClass.getName()) && newDesc.hasCoprocessor(coprocessorClass.getName()));
+        String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+        return (coprocessorClassName != null && existingDesc.hasCoprocessor(coprocessorClassName) && newDesc.hasCoprocessor(coprocessorClassName));
 }
 
     private void modifyTable(byte[] tableName, TableDescriptor newDesc, boolean shouldPoll) throws IOException,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
index 2daa6dc..a8db0c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
@@ -22,7 +22,6 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
@@ -60,21 +59,18 @@ public class NotAvailableTransactionProvider implements PhoenixTransactionProvid
     }
 
     @Override
-    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) {
-        throw new UnsupportedOperationException(message);
+    public Provider getProvider() {
+        return TransactionFactory.Provider.TEPHRA;
     }
 
     @Override
-    public Class<? extends RegionObserver> getCoprocessor() {
+    public String getCoprocessorClassName() {
         throw new UnsupportedOperationException(message);
     }
 
     @Override
-    public Class<? extends RegionObserver> getGCCoprocessor() {return null;}
-
-    @Override
-    public Provider getProvider() {
-        return TransactionFactory.Provider.TEPHRA;
+    public String getGCCoprocessorClassName() {
+        throw new UnsupportedOperationException(message);
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index 9247248..bbddc13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -23,18 +23,11 @@ import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.InMemoryCommitTable;
 import org.apache.omid.transaction.HBaseOmidClientConfiguration;
 import org.apache.omid.transaction.HBaseTransactionManager;
 import org.apache.omid.transaction.TTable;
-import org.apache.omid.tso.TSOMockModule;
-import org.apache.omid.tso.TSOServer;
-import org.apache.omid.tso.TSOServerConfig;
-import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY;
 import org.apache.omid.tso.client.OmidClientConfiguration;
-import org.apache.omid.tso.client.TSOClient;
 import org.apache.phoenix.coprocessor.OmidGCProcessor;
 import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -42,22 +35,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
-import org.apache.phoenix.util.TransactionUtil;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 public class OmidTransactionProvider implements PhoenixTransactionProvider {
     private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
-    public static final String OMID_TSO_PORT = "phoenix.omid.tso.port";
-    public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size";
-    public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type";
-    public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
-    public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
 
     private HBaseTransactionManager transactionManager = null;
     private volatile CommitTable.Client commitTableClient = null;
-    private CommitTable.Writer commitTableWriter = null;
 
     public static final OmidTransactionProvider getInstance() {
         return INSTANCE;
@@ -120,84 +103,20 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
         return commitTableClient;
     }
 
-    @Override
-    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws  SQLException{
-        TSOServerConfig tsoConfig = new TSOServerConfig();
-        TSOServer tso;
-
-        tsoConfig.setPort(port);
-        tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
-        tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
-        tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString());
-
-        Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
-        tso = injector.getInstance(TSOServer.class);
-        tso.startAsync();
-        tso.awaitRunning();
-
-        OmidClientConfiguration clientConfig = new OmidClientConfiguration();
-        clientConfig.setConnectionString("localhost:" + port);
-        clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
-
-        InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
-
-        try {
-            // Create the associated Handler
-            TSOClient client = TSOClient.newInstance(clientConfig);
-
-            HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
-            clientConf.setConnectionString("localhost:" + port);
-            clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
-            clientConf.setHBaseConfiguration(config);
-            commitTableClient = commitTable.getClient();
-            commitTableWriter = commitTable.getWriter();
-            transactionManager = HBaseTransactionManager.builder(clientConf)
-                    .commitTableClient(commitTableClient)
-                    .commitTableWriter(commitTableWriter)
-                    .tsoClient(client).build();
-        } catch (IOException | InterruptedException e) {
-            throw new SQLExceptionInfo.Builder(
-                    SQLExceptionCode.TRANSACTION_FAILED)
-                    .setMessage(e.getMessage()).setRootCause(e).build()
-                    .buildException();
-        }
-
-        return new OmidTransactionService(tso, transactionManager);
-    }
-
-    static class OmidTransactionService implements PhoenixTransactionService {
-        private final HBaseTransactionManager transactionManager;
-        private TSOServer tso;
-
-        public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
-            this.tso = tso;
-            this.transactionManager = transactionManager;
-        }
-
-        public void start() {
-
-        }
-
-        @Override
-        public void close() throws IOException {
-            if (transactionManager != null) {
-                transactionManager.close();
-            }
-            if (tso != null) {
-                tso.stopAsync();
-                tso.awaitTerminated();
-            }
-        }
+    // For testing only
+    public void injectTestService(HBaseTransactionManager transactionManager, CommitTable.Client commitTableClient) {
+        this.transactionManager = transactionManager;
+        this.commitTableClient = commitTableClient;
     }
 
     @Override
-    public Class<? extends RegionObserver> getCoprocessor() {
-        return OmidTransactionalProcessor.class;
+    public String getCoprocessorClassName() {
+        return OmidTransactionalProcessor.class.getName();
     }
 
     @Override
-    public Class<? extends RegionObserver> getGCCoprocessor() {
-        return OmidGCProcessor.class;
+    public String getGCCoprocessorClassName() {
+        return OmidGCProcessor.class.getName();
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
index f50b6b5..d730c67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -22,7 +22,6 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -50,9 +49,8 @@ public interface PhoenixTransactionProvider {
     public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException;
 
     public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException;
-    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws  SQLException;
-    public Class<? extends RegionObserver> getCoprocessor();
-    public Class<? extends RegionObserver> getGCCoprocessor();
+    public String getCoprocessorClassName();
+    public String getGCCoprocessorClassName();
 
     public TransactionFactory.Provider getProvider();
     public boolean isUnsupported(Feature feature);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
index 9a216ee..7049858 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -33,14 +32,9 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionService;
 import org.apache.tephra.distributed.TransactionServiceClient;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService;
 import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService;
@@ -48,7 +42,6 @@ import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices;
 import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients;
 
 import org.apache.tephra.shaded.com.google.common.collect.ArrayListMultimap;
-import com.google.inject.util.Providers;
 
 public class TephraTransactionProvider implements PhoenixTransactionProvider {
     private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
@@ -110,57 +103,6 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider {
         return client;
     }
 
-    @Override
-    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) {
-        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port);
-        int retryTimeOut = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC, 
-                TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC);
-        ZKClientService zkClient = ZKClientServices.delegate(
-          ZKClients.reWatchOnExpire(
-            ZKClients.retryOnFailure(
-              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
-                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
-                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-                .build(),
-              RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS)
-            )
-          )
-        );
-
-        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-        TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, 
-                new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
-        TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
-        TephraTransactionService service = new TephraTransactionService(zkClient, txService);
-        service.start();
-        return service;
-    }
-
-    static class TephraTransactionService implements PhoenixTransactionService {
-        private final ZKClientService zkClient;
-        private final TransactionService txService;
-
-        public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
-            this.zkClient = zkClient;
-            this.txService = txService;
-        }
-        
-        public void start() {
-            zkClient.startAndWait();
-            txService.startAndWait();            
-        }
-        
-        @Override
-        public void close() throws IOException {
-            try {
-                if (txService != null) txService.stopAndWait();
-            } finally {
-                if (zkClient != null) zkClient.stopAndWait();
-            }
-        }
-        
-    }
-    
     static class TephraTransactionClient implements PhoenixTransactionClient {
         private final ZKClientService zkClient;
         private final TransactionSystemClient txClient;
@@ -188,14 +130,16 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider {
         }
         
     }
-    
+
     @Override
-    public Class<? extends RegionObserver> getCoprocessor() {
-        return TephraTransactionalProcessor.class;
+    public String getCoprocessorClassName() {
+        return TephraTransactionalProcessor.class.getName();
     }
 
     @Override
-    public Class<? extends RegionObserver> getGCCoprocessor() {return null;}
+    public String getGCCoprocessorClassName() {
+        return null;
+    }
 
     @Override
     public Provider getProvider() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index ab0d4c9..7f6afcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.transaction;
 
 import java.io.IOException;
-
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 
 public class TransactionFactory {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java
new file mode 100644
index 0000000..a4a15d8
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.Client;
+import org.apache.omid.committable.CommitTable.Writer;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.HBaseOmidClientConfiguration;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService;
+import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Providers;
+
+public class TestTransactionServiceManager {
+
+    public static final String OMID_TSO_PORT = "phoenix.omid.tso.port";
+    public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size";
+    public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type";
+    public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
+    public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
+
+    public static OmidTransactionService startAndInjectOmidTransactionService(
+            OmidTransactionProvider transactionProvider, Configuration config,
+            ConnectionInfo connectionInfo, int port) throws SQLException {
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        TSOServer tso;
+
+        tsoConfig.setPort(port);
+        tsoConfig.setConflictMapSize(
+            config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
+        tsoConfig.setTimestampType(
+            config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
+        tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString());
+
+        Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        tso = injector.getInstance(TSOServer.class);
+        tso.startAsync();
+        tso.awaitRunning();
+
+        OmidClientConfiguration clientConfig = new OmidClientConfiguration();
+        clientConfig.setConnectionString("localhost:" + port);
+        clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+
+        InMemoryCommitTable commitTable =
+                (InMemoryCommitTable) injector.getInstance(CommitTable.class);
+
+        HBaseTransactionManager transactionManager;
+        Client commitTableClient;
+        Writer commitTableWriter;
+        try {
+            // Create the associated Handler
+            TSOClient client = TSOClient.newInstance(clientConfig);
+
+            HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+            clientConf.setConnectionString("localhost:" + port);
+            clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+            clientConf.setHBaseConfiguration(config);
+            commitTableClient = commitTable.getClient();
+            commitTableWriter = commitTable.getWriter();
+            transactionManager =
+                    HBaseTransactionManager.builder(clientConf).commitTableClient(commitTableClient)
+                            .commitTableWriter(commitTableWriter).tsoClient(client).build();
+        } catch (IOException | InterruptedException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build().buildException();
+        }
+
+        transactionProvider.injectTestService(transactionManager, commitTableClient);
+
+        return new OmidTransactionService(tso, transactionManager);
+    }
+
+    public static class OmidTransactionService implements PhoenixTransactionService {
+        private final HBaseTransactionManager transactionManager;
+        private TSOServer tso;
+
+        public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
+            this.tso = tso;
+            this.transactionManager = transactionManager;
+        }
+
+        public void start() {
+
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (transactionManager != null) {
+                transactionManager.close();
+            }
+            if (tso != null) {
+                tso.stopAsync();
+                tso.awaitTerminated();
+            }
+        }
+    }
+
+    public static TephraTransactionService startAndInjectTephraTransactionService(
+            TephraTransactionProvider transactionProvider,
+            Configuration config, ConnectionInfo connInfo, int port) {
+        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port);
+        int retryTimeOut =
+                config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC,
+                    TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC);
+        ZKClientService zkClient =
+                ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(
+                    ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                            .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                                HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                            .build(),
+                    RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS))));
+
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        TransactionManager txManager =
+                new TransactionManager(
+                        config, new HDFSTransactionStateStorage(config,
+                                new SnapshotCodecProvider(config), new TxMetricsCollector()),
+                        new TxMetricsCollector());
+        TransactionService txService =
+                new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+        TephraTransactionService service = new TephraTransactionService(zkClient, txService);
+        service.start();
+
+        return service;
+    }
+
+    public static class TephraTransactionService implements PhoenixTransactionService {
+        private final ZKClientService zkClient;
+        private final TransactionService txService;
+
+        public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
+            this.zkClient = zkClient;
+            this.txService = txService;
+        }
+
+        public void start() {
+            zkClient.startAndWait();
+            txService.startAndWait();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                if (txService != null) txService.stopAndWait();
+            } finally {
+                if (zkClient != null) zkClient.stopAndWait();
+            }
+        }
+
+    }
+
+    public static PhoenixTransactionService startTransactionService(TransactionFactory.Provider provider, Configuration config, ConnectionInfo connInfo, int port) throws SQLException {
+        PhoenixTransactionProvider transactionProvider = provider.getTransactionProvider();
+        if(provider == Provider.TEPHRA) {
+            return startAndInjectTephraTransactionService((TephraTransactionProvider)transactionProvider, config, connInfo, port);
+        } else if (provider == Provider.OMID) {
+            return startAndInjectOmidTransactionService((OmidTransactionProvider)transactionProvider, config, connInfo, port);
+        }
+        throw new UnsupportedOperationException("Unknown transaction provider");
+    }
+
+}
\ No newline at end of file