You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/05/10 18:04:35 UTC
[36/46] phoenix git commit: Remove tephra dependency from BaseTest
Remove tephra dependency from BaseTest
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b3a21368
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b3a21368
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b3a21368
Branch: refs/heads/omid
Commit: b3a213685ef97a41c1a369f035949b22b03d6083
Parents: f090dd2
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Mon May 8 12:27:11 2017 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Mon May 8 12:27:11 2017 +0300
----------------------------------------------------------------------
.../apache/phoenix/execute/MutationState.java | 14 ++--
.../transaction/OmidTransactionContext.java | 19 ++++++
.../transaction/PhoenixTransactionContext.java | 16 +++++
.../transaction/TephraTransactionContext.java | 64 ++++++++++++++++++
.../transaction/TephraTransactionTable.java | 12 +++-
.../apache/phoenix/util/TransactionUtil.java | 4 +-
.../java/org/apache/phoenix/query/BaseTest.java | 68 ++------------------
7 files changed, 124 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 2b72be1..e8d963e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -297,7 +297,7 @@ public class MutationState implements SQLCloseable {
public HTableInterface getHTable(PTable table) throws SQLException {
HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) {
- PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table.isImmutableRows());
+ PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table);
// Using cloned mutationState as we may have started a new transaction already
// if auto commit is true and we need to use the original one here.
htable = phoenixTransactionTable;
@@ -970,7 +970,7 @@ public class MutationState implements SQLCloseable {
if (table.isTransactional()) {
// Track tables to which we've sent uncommitted data
txTableRefs.add(origTableRef);
- addDMLFence(table);
+// addDMLFence(table);
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
// If we have indexes, wrap the HTable in a delegate HTable that
@@ -980,7 +980,7 @@ public class MutationState implements SQLCloseable {
hTable = new MetaDataAwareHTable(hTable, origTableRef);
}
- hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table.isImmutableRows());
+ hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table);
}
long numMutations = mutationList.size();
@@ -1231,10 +1231,10 @@ public class MutationState implements SQLCloseable {
startTransaction();
// Add back read fences
Set<TableRef> txTableRefs = txMutations.keySet();
- for (TableRef tableRef : txTableRefs) {
- PTable dataTable = tableRef.getTable();
- addDMLFence(dataTable);
- }
+// for (TableRef tableRef : txTableRefs) {
+// PTable dataTable = tableRef.getTable();
+// addDMLFence(dataTable);
+// }
try {
// Only retry if an index was added
retryCommit = shouldResubmitTransaction(txTableRefs);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index cec07d3..25ec0cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -1,5 +1,6 @@
package org.apache.phoenix.transaction;
+import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
@@ -141,4 +142,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setupTxManager(Configuration config, String url) throws SQLException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void tearDownTxManager() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 36f7804..5b1a837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -8,6 +8,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
+import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
@@ -164,4 +165,19 @@ public interface PhoenixTransactionContext {
* @return the family delete marker
*/
public byte[] get_famility_delete_marker();
+
+ /**
+ * Setup transaction manager's configuration for testing
+ */
+ public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException;
+
+ /**
+ * Setup transaction manager for testing
+ */
+ public void setupTxManager(Configuration config, String url) throws SQLException;
+
+ /**
+ * Tear down transaction manager for testing
+ */
+ public void tearDownTxManager();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 0334826..447ce0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -35,7 +35,13 @@ import org.apache.tephra.util.TxUtils;
import org.apache.tephra.visibility.FenceWait;
import org.apache.tephra.visibility.VisibilityFence;
import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.utils.Networks;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
@@ -43,6 +49,7 @@ import org.apache.twill.zookeeper.ZKClients;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.inject.util.Providers;
import org.slf4j.Logger;
@@ -51,6 +58,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
private static final TransactionCodec CODEC = new TransactionCodec();
private static TransactionSystemClient txClient = null;
+ private static ZKClientService zkClient = null;
+ private static TransactionService txService = null;
+ private static TransactionManager txManager = null;
private final List<TransactionAware> txAwares;
private final TransactionContext txContext;
@@ -410,6 +420,60 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
return TxConstants.FAMILY_DELETE_QUALIFIER;
}
+ @Override
+ public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+ config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+ config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+ config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
+ config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
+ config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
+ config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+ }
+
+ @Override
+ public void setupTxManager(Configuration config, String url) throws SQLException {
+
+ if (txService != null) {
+ return;
+ }
+
+ ConnectionInfo connInfo = ConnectionInfo.create(url);
+ zkClient = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+ .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+ HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+ .build(),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ )
+ )
+ );
+ zkClient.startAndWait();
+
+ DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+ txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
+ txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+ txService.startAndWait();
+ }
+
+ @Override
+ public void tearDownTxManager() {
+ try {
+ if (txService != null) txService.stopAndWait();
+ } finally {
+ try {
+ if (zkClient != null) zkClient.stopAndWait();
+ } finally {
+ txService = null;
+ zkClient = null;
+ txManager = null;
+ }
+ }
+ }
+
/**
* TephraTransactionContext specific functions
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
index e33a280..49753f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
@@ -38,18 +40,22 @@ public class TephraTransactionTable implements PhoenixTransactionalTable {
private TephraTransactionContext tephraTransactionContext;
public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
- this(ctx, hTable, false);
+ this(ctx, hTable, null);
}
- public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, boolean isImmutableRows) {
+ public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) {
assert(ctx instanceof TephraTransactionContext);
tephraTransactionContext = (TephraTransactionContext) ctx;
- transactionAwareHTable = new TransactionAwareHTable(hTable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+ transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
tephraTransactionContext.addTransactionAware(transactionAwareHTable);
+
+ if (pTable != null && pTable.getType() != PTableType.INDEX) {
+ tephraTransactionContext.markDMLFence(pTable);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 0a55147..01b775e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -50,8 +50,8 @@ public class TransactionUtil {
return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
}
- public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, boolean isImmutableRows) {
- return new TephraTransactionTable(phoenixTransactionContext, htable, isImmutableRows);
+ public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) {
+ return new TephraTransactionTable(phoenixTransactionContext, htable, pTable);
}
// we resolve transactional tables at the txn read pointer
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 078c1e8..ff1007d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -130,12 +130,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ConfigUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -143,19 +143,6 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -165,7 +152,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.util.Providers;
/**
*
@@ -186,9 +172,6 @@ public abstract class BaseTest {
private static final Map<String,String> tableDDLMap;
private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
- private static ZKClientService zkClient;
- private static TransactionService txService;
- protected static TransactionManager txManager;
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
private static final int dropTableTimeout = 300; // 5 mins should be long enough.
@@ -437,50 +420,15 @@ public abstract class BaseTest {
}
private static void tearDownTxManager() throws SQLException {
- try {
- if (txService != null) txService.stopAndWait();
- } finally {
- try {
- if (zkClient != null) zkClient.stopAndWait();
- } finally {
- txService = null;
- zkClient = null;
- txManager = null;
- }
- }
-
+ TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager();
}
-
+
protected static void setTxnConfigs() throws IOException {
- config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
- config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
- config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
- config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS);
- config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
- config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+ TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
}
-
- protected static void setupTxManager() throws SQLException, IOException {
- ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
- zkClient = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
- .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
- HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
- .build(),
- RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
- )
- )
- );
- zkClient.startAndWait();
- DiscoveryService discovery = new ZKDiscoveryService(zkClient);
- txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
- txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
- txService.startAndWait();
+ protected static void setupTxManager() throws SQLException, IOException {
+ TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl());
}
private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
@@ -499,9 +447,7 @@ public abstract class BaseTest {
}
private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException {
- if (txService == null) {
- setupTxManager();
- }
+ setupTxManager();
}
/**