You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2022/02/02 05:14:44 UTC
[phoenix] branch master updated: PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 22f5d22 PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider
22f5d22 is described below
commit 22f5d22bc20df3e419440d6076c98412b4b30ae2
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Jan 27 10:54:35 2022 +0100
PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider
---
phoenix-core/pom.xml | 10 +-
.../end2end/ConnectionQueryServicesTestImpl.java | 3 +-
.../phoenix/tx/ParameterizedTransactionIT.java | 10 +-
.../phoenix/query/ConnectionQueryServicesImpl.java | 38 ++--
.../NotAvailableTransactionProvider.java | 14 +-
.../transaction/OmidTransactionProvider.java | 97 +---------
.../transaction/PhoenixTransactionProvider.java | 6 +-
.../transaction/TephraTransactionProvider.java | 68 +------
.../phoenix/transaction/TransactionFactory.java | 1 -
.../transaction/TestTransactionServiceManager.java | 207 +++++++++++++++++++++
10 files changed, 257 insertions(+), 197 deletions(-)
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index d022973..5167399 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -385,10 +385,6 @@
</dependency>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-tso-server-hbase2.x</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.omid</groupId>
<artifactId>omid-hbase-common-hbase2.x</artifactId>
</dependency>
<dependency>
@@ -416,6 +412,12 @@
<dependency>
<groupId>org.apache.omid</groupId>
<artifactId>omid-tso-server-hbase2.x</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-tso-server-hbase2.x</artifactId>
+ <scope>test</scope>
<type>test-jar</type>
</dependency>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index da88f3a..7f436ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.transaction.PhoenixTransactionClient;
import org.apache.phoenix.transaction.PhoenixTransactionService;
+import org.apache.phoenix.transaction.TestTransactionServiceManager;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.SQLCloseables;
@@ -105,7 +106,7 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
PhoenixTransactionService txService = txServices[provider.ordinal()];
if (txService == null) {
int port = TestUtil.getRandomPort();
- txService = txServices[provider.ordinal()] = provider.getTransactionProvider().getTransactionService(config, connectionInfo, port);
+ txService = txServices[provider.ordinal()] = TestTransactionServiceManager.startTransactionService(provider, config, connectionInfo, port);
}
return super.initTransactionClient(provider);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index 93e55f9..29aa9ef 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -63,7 +62,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -392,8 +390,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
assertFalse(rs.next());
htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName));
- Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
- assertFalse(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+ assertFalse(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
getColumnFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -416,8 +413,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
PTable table = pconn.getTable(new PTableKey(null, t1));
Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
assertTrue(table.isTransactional());
- Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
- assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+ assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
try {
ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -461,7 +457,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
table = pconn.getTable(new PTableKey(null, t1));
htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
assertTrue(table.isTransactional());
- assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
+ assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName()));
}
@Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 27fe2ed..f340deb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -105,7 +105,6 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -151,7 +150,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
@@ -1150,26 +1148,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
if (isTransactional) {
- Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
- if (!newDesc.hasCoprocessor(coprocessorClass.getName())) {
- builder.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null);
- }
- Class<? extends RegionObserver> coprocessorGCClass = provider.getTransactionProvider().getGCCoprocessor();
- if (coprocessorGCClass != null) {
- if (!newDesc.hasCoprocessor(coprocessorGCClass.getName())) {
- builder.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, null);
+ String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+ if (!newDesc.hasCoprocessor(coprocessorClassName)) {
+ builder.addCoprocessor(coprocessorClassName, null, priority - 10, null);
+ }
+ String coprocessorGCClassName = provider.getTransactionProvider().getGCCoprocessorClassName();
+ if (coprocessorGCClassName != null) {
+ if (!newDesc.hasCoprocessor(coprocessorGCClassName)) {
+ builder.addCoprocessor(coprocessorGCClassName, null, priority - 10, null);
}
}
} else {
// Remove all potential transactional coprocessors
for (TransactionFactory.Provider aprovider : TransactionFactory.Provider.available()) {
- Class<? extends RegionObserver> coprocessorClass = aprovider.getTransactionProvider().getCoprocessor();
- Class<? extends RegionObserver> coprocessorGCClass = aprovider.getTransactionProvider().getGCCoprocessor();
- if (coprocessorClass != null && newDesc.hasCoprocessor(coprocessorClass.getName())) {
- builder.removeCoprocessor(coprocessorClass.getName());
+ String coprocessorClassName = aprovider.getTransactionProvider().getCoprocessorClassName();
+ String coprocessorGCClassName = aprovider.getTransactionProvider().getGCCoprocessorClassName();
+ if (coprocessorClassName != null && newDesc.hasCoprocessor(coprocessorClassName)) {
+ builder.removeCoprocessor(coprocessorClassName);
}
- if (coprocessorGCClass != null && newDesc.hasCoprocessor(coprocessorGCClass.getName())) {
- builder.removeCoprocessor(coprocessorGCClass.getName());
+ if (coprocessorGCClassName != null && newDesc.hasCoprocessor(coprocessorGCClassName)) {
+ builder.removeCoprocessor(coprocessorGCClassName);
}
}
}
@@ -1610,8 +1608,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private static boolean hasTxCoprocessor(TableDescriptor descriptor) {
for (TransactionFactory.Provider provider : TransactionFactory.Provider.available()) {
- Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
- if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) {
+ String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+ if (coprocessorClassName != null && descriptor.hasCoprocessor(coprocessorClassName)) {
return true;
}
}
@@ -1619,8 +1617,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
private static boolean equalTxCoprocessor(TransactionFactory.Provider provider, TableDescriptor existingDesc, TableDescriptor newDesc) {
- Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
- return (coprocessorClass != null && existingDesc.hasCoprocessor(coprocessorClass.getName()) && newDesc.hasCoprocessor(coprocessorClass.getName()));
+ String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName();
+ return (coprocessorClassName != null && existingDesc.hasCoprocessor(coprocessorClassName) && newDesc.hasCoprocessor(coprocessorClassName));
}
private void modifyTable(byte[] tableName, TableDescriptor newDesc, boolean shouldPoll) throws IOException,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
index 2daa6dc..a8db0c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
@@ -22,7 +22,6 @@ import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
@@ -60,21 +59,18 @@ public class NotAvailableTransactionProvider implements PhoenixTransactionProvid
}
@Override
- public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) {
- throw new UnsupportedOperationException(message);
+ public Provider getProvider() {
+ return TransactionFactory.Provider.TEPHRA;
}
@Override
- public Class<? extends RegionObserver> getCoprocessor() {
+ public String getCoprocessorClassName() {
throw new UnsupportedOperationException(message);
}
@Override
- public Class<? extends RegionObserver> getGCCoprocessor() {return null;}
-
- @Override
- public Provider getProvider() {
- return TransactionFactory.Provider.TEPHRA;
+ public String getGCCoprocessorClassName() {
+ throw new UnsupportedOperationException(message);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index 9247248..bbddc13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -23,18 +23,11 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.transaction.HBaseOmidClientConfiguration;
import org.apache.omid.transaction.HBaseTransactionManager;
import org.apache.omid.transaction.TTable;
-import org.apache.omid.tso.TSOMockModule;
-import org.apache.omid.tso.TSOServer;
-import org.apache.omid.tso.TSOServerConfig;
-import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY;
import org.apache.omid.tso.client.OmidClientConfiguration;
-import org.apache.omid.tso.client.TSOClient;
import org.apache.phoenix.coprocessor.OmidGCProcessor;
import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -42,22 +35,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
-import org.apache.phoenix.util.TransactionUtil;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
public class OmidTransactionProvider implements PhoenixTransactionProvider {
private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
- public static final String OMID_TSO_PORT = "phoenix.omid.tso.port";
- public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size";
- public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type";
- public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
- public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
private HBaseTransactionManager transactionManager = null;
private volatile CommitTable.Client commitTableClient = null;
- private CommitTable.Writer commitTableWriter = null;
public static final OmidTransactionProvider getInstance() {
return INSTANCE;
@@ -120,84 +103,20 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
return commitTableClient;
}
- @Override
- public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException{
- TSOServerConfig tsoConfig = new TSOServerConfig();
- TSOServer tso;
-
- tsoConfig.setPort(port);
- tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
- tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
- tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString());
-
- Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
- tso = injector.getInstance(TSOServer.class);
- tso.startAsync();
- tso.awaitRunning();
-
- OmidClientConfiguration clientConfig = new OmidClientConfiguration();
- clientConfig.setConnectionString("localhost:" + port);
- clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
-
- InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
-
- try {
- // Create the associated Handler
- TSOClient client = TSOClient.newInstance(clientConfig);
-
- HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
- clientConf.setConnectionString("localhost:" + port);
- clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
- clientConf.setHBaseConfiguration(config);
- commitTableClient = commitTable.getClient();
- commitTableWriter = commitTable.getWriter();
- transactionManager = HBaseTransactionManager.builder(clientConf)
- .commitTableClient(commitTableClient)
- .commitTableWriter(commitTableWriter)
- .tsoClient(client).build();
- } catch (IOException | InterruptedException e) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.TRANSACTION_FAILED)
- .setMessage(e.getMessage()).setRootCause(e).build()
- .buildException();
- }
-
- return new OmidTransactionService(tso, transactionManager);
- }
-
- static class OmidTransactionService implements PhoenixTransactionService {
- private final HBaseTransactionManager transactionManager;
- private TSOServer tso;
-
- public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
- this.tso = tso;
- this.transactionManager = transactionManager;
- }
-
- public void start() {
-
- }
-
- @Override
- public void close() throws IOException {
- if (transactionManager != null) {
- transactionManager.close();
- }
- if (tso != null) {
- tso.stopAsync();
- tso.awaitTerminated();
- }
- }
+ // For testing only
+ public void injectTestService(HBaseTransactionManager transactionManager, CommitTable.Client commitTableClient) {
+ this.transactionManager = transactionManager;
+ this.commitTableClient = commitTableClient;
}
@Override
- public Class<? extends RegionObserver> getCoprocessor() {
- return OmidTransactionalProcessor.class;
+ public String getCoprocessorClassName() {
+ return OmidTransactionalProcessor.class.getName();
}
@Override
- public Class<? extends RegionObserver> getGCCoprocessor() {
- return OmidGCProcessor.class;
+ public String getGCCoprocessorClassName() {
+ return OmidGCProcessor.class.getName();
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
index f50b6b5..d730c67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -22,7 +22,6 @@ import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -50,9 +49,8 @@ public interface PhoenixTransactionProvider {
public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException;
public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException;
- public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException;
- public Class<? extends RegionObserver> getCoprocessor();
- public Class<? extends RegionObserver> getGCCoprocessor();
+ public String getCoprocessorClassName();
+ public String getGCCoprocessorClassName();
public TransactionFactory.Provider getProvider();
public boolean isUnsupported(Feature feature);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
index 9a216ee..7049858 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -33,14 +32,9 @@ import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionService;
import org.apache.tephra.distributed.TransactionServiceClient;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService;
import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService;
@@ -48,7 +42,6 @@ import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients;
import org.apache.tephra.shaded.com.google.common.collect.ArrayListMultimap;
-import com.google.inject.util.Providers;
public class TephraTransactionProvider implements PhoenixTransactionProvider {
private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
@@ -110,57 +103,6 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider {
return client;
}
- @Override
- public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) {
- config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port);
- int retryTimeOut = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC);
- ZKClientService zkClient = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
- .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
- HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
- .build(),
- RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS)
- )
- )
- );
-
- DiscoveryService discovery = new ZKDiscoveryService(zkClient);
- TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config,
- new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
- TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
- TephraTransactionService service = new TephraTransactionService(zkClient, txService);
- service.start();
- return service;
- }
-
- static class TephraTransactionService implements PhoenixTransactionService {
- private final ZKClientService zkClient;
- private final TransactionService txService;
-
- public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
- this.zkClient = zkClient;
- this.txService = txService;
- }
-
- public void start() {
- zkClient.startAndWait();
- txService.startAndWait();
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (txService != null) txService.stopAndWait();
- } finally {
- if (zkClient != null) zkClient.stopAndWait();
- }
- }
-
- }
-
static class TephraTransactionClient implements PhoenixTransactionClient {
private final ZKClientService zkClient;
private final TransactionSystemClient txClient;
@@ -188,14 +130,16 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider {
}
}
-
+
@Override
- public Class<? extends RegionObserver> getCoprocessor() {
- return TephraTransactionalProcessor.class;
+ public String getCoprocessorClassName() {
+ return TephraTransactionalProcessor.class.getName();
}
@Override
- public Class<? extends RegionObserver> getGCCoprocessor() {return null;}
+ public String getGCCoprocessorClassName() {
+ return null;
+ }
@Override
public Provider getProvider() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index ab0d4c9..7f6afcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.transaction;
import java.io.IOException;
-
import org.apache.phoenix.coprocessor.MetaDataProtocol;
public class TransactionFactory {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java
new file mode 100644
index 0000000..a4a15d8
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.Client;
+import org.apache.omid.committable.CommitTable.Writer;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.HBaseOmidClientConfiguration;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService;
+import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Providers;
+
+public class TestTransactionServiceManager {
+
+ public static final String OMID_TSO_PORT = "phoenix.omid.tso.port";
+ public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size";
+ public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type";
+ public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
+ public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
+
+ public static OmidTransactionService startAndInjectOmidTransactionService(
+ OmidTransactionProvider transactionProvider, Configuration config,
+ ConnectionInfo connectionInfo, int port) throws SQLException {
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ TSOServer tso;
+
+ tsoConfig.setPort(port);
+ tsoConfig.setConflictMapSize(
+ config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
+ tsoConfig.setTimestampType(
+ config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
+ tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString());
+
+ Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+ tso = injector.getInstance(TSOServer.class);
+ tso.startAsync();
+ tso.awaitRunning();
+
+ OmidClientConfiguration clientConfig = new OmidClientConfiguration();
+ clientConfig.setConnectionString("localhost:" + port);
+ clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+
+ InMemoryCommitTable commitTable =
+ (InMemoryCommitTable) injector.getInstance(CommitTable.class);
+
+ HBaseTransactionManager transactionManager;
+ Client commitTableClient;
+ Writer commitTableWriter;
+ try {
+ // Create the associated Handler
+ TSOClient client = TSOClient.newInstance(clientConfig);
+
+ HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+ clientConf.setConnectionString("localhost:" + port);
+ clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+ clientConf.setHBaseConfiguration(config);
+ commitTableClient = commitTable.getClient();
+ commitTableWriter = commitTable.getWriter();
+ transactionManager =
+ HBaseTransactionManager.builder(clientConf).commitTableClient(commitTableClient)
+ .commitTableWriter(commitTableWriter).tsoClient(client).build();
+ } catch (IOException | InterruptedException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build().buildException();
+ }
+
+ transactionProvider.injectTestService(transactionManager, commitTableClient);
+
+ return new OmidTransactionService(tso, transactionManager);
+ }
+
+ public static class OmidTransactionService implements PhoenixTransactionService {
+ private final HBaseTransactionManager transactionManager;
+ private TSOServer tso;
+
+ public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
+ this.tso = tso;
+ this.transactionManager = transactionManager;
+ }
+
+ public void start() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (transactionManager != null) {
+ transactionManager.close();
+ }
+ if (tso != null) {
+ tso.stopAsync();
+ tso.awaitTerminated();
+ }
+ }
+ }
+
+ public static TephraTransactionService startAndInjectTephraTransactionService(
+ TephraTransactionProvider transactionProvider,
+ Configuration config, ConnectionInfo connInfo, int port) {
+ config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port);
+ int retryTimeOut =
+ config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC,
+ TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC);
+ ZKClientService zkClient =
+ ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+ .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+ HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+ .build(),
+ RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS))));
+
+ DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+ TransactionManager txManager =
+ new TransactionManager(
+ config, new HDFSTransactionStateStorage(config,
+ new SnapshotCodecProvider(config), new TxMetricsCollector()),
+ new TxMetricsCollector());
+ TransactionService txService =
+ new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+ TephraTransactionService service = new TephraTransactionService(zkClient, txService);
+ service.start();
+
+ return service;
+ }
+
+ public static class TephraTransactionService implements PhoenixTransactionService {
+ private final ZKClientService zkClient;
+ private final TransactionService txService;
+
+ public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
+ this.zkClient = zkClient;
+ this.txService = txService;
+ }
+
+ public void start() {
+ zkClient.startAndWait();
+ txService.startAndWait();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (txService != null) txService.stopAndWait();
+ } finally {
+ if (zkClient != null) zkClient.stopAndWait();
+ }
+ }
+
+ }
+
+ public static PhoenixTransactionService startTransactionService(TransactionFactory.Provider provider, Configuration config, ConnectionInfo connInfo, int port) throws SQLException {
+ PhoenixTransactionProvider transactionProvider = provider.getTransactionProvider();
+ if(provider == Provider.TEPHRA) {
+ return startAndInjectTephraTransactionService((TephraTransactionProvider)transactionProvider, config, connInfo, port);
+ } else if (provider == Provider.OMID) {
+ return startAndInjectOmidTransactionService((OmidTransactionProvider)transactionProvider, config, connInfo, port);
+ }
+ throw new UnsupportedOperationException("Unknown transaction provider");
+ }
+
+}
\ No newline at end of file