You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/05/10 18:04:35 UTC

[36/46] phoenix git commit: Remove tephra dependency from BaseTest

Remove tephra dependency from BaseTest


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b3a21368
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b3a21368
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b3a21368

Branch: refs/heads/omid
Commit: b3a213685ef97a41c1a369f035949b22b03d6083
Parents: f090dd2
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Mon May 8 12:27:11 2017 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Mon May 8 12:27:11 2017 +0300

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 14 ++--
 .../transaction/OmidTransactionContext.java     | 19 ++++++
 .../transaction/PhoenixTransactionContext.java  | 16 +++++
 .../transaction/TephraTransactionContext.java   | 64 ++++++++++++++++++
 .../transaction/TephraTransactionTable.java     | 12 +++-
 .../apache/phoenix/util/TransactionUtil.java    |  4 +-
 .../java/org/apache/phoenix/query/BaseTest.java | 68 ++------------------
 7 files changed, 124 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 2b72be1..e8d963e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -297,7 +297,7 @@ public class MutationState implements SQLCloseable {
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) {
-            PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table.isImmutableRows());
+            PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table);
             // Using cloned mutationState as we may have started a new transaction already
             // if auto commit is true and we need to use the original one here.
             htable = phoenixTransactionTable;
@@ -970,7 +970,7 @@ public class MutationState implements SQLCloseable {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted data
                             txTableRefs.add(origTableRef);
-                            addDMLFence(table);
+//                            addDMLFence(table);
                             uncommittedPhysicalNames.add(table.getPhysicalName().getString());
 
                             // If we have indexes, wrap the HTable in a delegate HTable that
@@ -980,7 +980,7 @@ public class MutationState implements SQLCloseable {
                                 hTable = new MetaDataAwareHTable(hTable, origTableRef);
                             }
 
-                            hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table.isImmutableRows());                          
+                            hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table);
                         }
                         
                         long numMutations = mutationList.size();
@@ -1231,10 +1231,10 @@ public class MutationState implements SQLCloseable {
                             startTransaction();
                             // Add back read fences
                             Set<TableRef> txTableRefs = txMutations.keySet();
-                            for (TableRef tableRef : txTableRefs) {
-                                PTable dataTable = tableRef.getTable();
-                                addDMLFence(dataTable);
-                            }
+//                            for (TableRef tableRef : txTableRefs) {
+//                                PTable dataTable = tableRef.getTable();
+//                                addDMLFence(dataTable);
+//                            }
                             try {
                                 // Only retry if an index was added
                                 retryCommit = shouldResubmitTransaction(txTableRefs);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index cec07d3..25ec0cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -1,5 +1,6 @@
 package org.apache.phoenix.transaction;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
@@ -141,4 +142,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setupTxManager(Configuration config, String url) throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void tearDownTxManager() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 36f7804..5b1a837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -8,6 +8,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
@@ -164,4 +165,19 @@ public interface PhoenixTransactionContext {
      * @return the family delete marker
      */
     public byte[] get_famility_delete_marker(); 
+
+    /**
+     * Setup transaction manager's configuration for testing
+     */
+     public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException;
+
+    /**
+     * Setup transaction manager for testing
+     */
+    public void setupTxManager(Configuration config, String url) throws SQLException;
+
+    /**
+     * Tear down transaction manager for testing
+     */
+    public void tearDownTxManager();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 0334826..447ce0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -35,7 +35,13 @@ import org.apache.tephra.util.TxUtils;
 import org.apache.tephra.visibility.FenceWait;
 import org.apache.tephra.visibility.VisibilityFence;
 import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKClientServices;
@@ -43,6 +49,7 @@ import org.apache.twill.zookeeper.ZKClients;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
+import com.google.inject.util.Providers;
 
 import org.slf4j.Logger;
 
@@ -51,6 +58,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
     private static final TransactionCodec CODEC = new TransactionCodec();
 
     private static TransactionSystemClient txClient = null;
+    private static ZKClientService zkClient = null;
+    private static TransactionService txService = null;
+    private static TransactionManager txManager = null;
 
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
@@ -410,6 +420,60 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
         return TxConstants.FAMILY_DELETE_QUALIFIER;
     }
 
+    @Override
+    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
+        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
+        config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
+        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+    }
+
+    @Override
+    public void setupTxManager(Configuration config, String url) throws SQLException {
+
+        if (txService != null) {
+            return;
+        }
+
+        ConnectionInfo connInfo = ConnectionInfo.create(url);
+        zkClient = ZKClientServices.delegate(
+          ZKClients.reWatchOnExpire(
+            ZKClients.retryOnFailure(
+              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                .build(),
+              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+            )
+          )
+        );
+        zkClient.startAndWait();
+
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
+        txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+        txService.startAndWait();
+    }
+
+    @Override
+    public void tearDownTxManager() {
+        try {
+            if (txService != null) txService.stopAndWait();
+        } finally {
+            try {
+                if (zkClient != null) zkClient.stopAndWait();
+            } finally {
+                txService = null;
+                zkClient = null;
+                txManager = null;
+            }
+        }
+    }
+
     /**
      * TephraTransactionContext specific functions
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
index e33a280..49753f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
@@ -38,18 +40,22 @@ public class TephraTransactionTable implements PhoenixTransactionalTable {
     private TephraTransactionContext tephraTransactionContext;
 
     public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
-        this(ctx, hTable, false);
+        this(ctx, hTable, null);
     }
 
-    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, boolean isImmutableRows) {
+    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) {
 
         assert(ctx instanceof TephraTransactionContext);
 
         tephraTransactionContext = (TephraTransactionContext) ctx;
 
-        transactionAwareHTable = new TransactionAwareHTable(hTable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+        transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
 
         tephraTransactionContext.addTransactionAware(transactionAwareHTable);
+
+        if (pTable != null && pTable.getType() != PTableType.INDEX) {
+            tephraTransactionContext.markDMLFence(pTable);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 0a55147..01b775e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -50,8 +50,8 @@ public class TransactionUtil {
         return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
     }
     
-    public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, boolean isImmutableRows) {
-        return new TephraTransactionTable(phoenixTransactionContext, htable, isImmutableRows);
+    public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) {
+        return new TephraTransactionTable(phoenixTransactionContext, htable, pTable);
     }
     
     // we resolve transactional tables at the txn read pointer

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 078c1e8..ff1007d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -130,12 +130,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -143,19 +143,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -165,7 +152,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.util.Providers;
 
 /**
  * 
@@ -186,9 +172,6 @@ public abstract class BaseTest {
     private static final Map<String,String> tableDDLMap;
     private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
     protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
-    private static ZKClientService zkClient;
-    private static TransactionService txService;
-    protected static TransactionManager txManager;
     @ClassRule
     public static TemporaryFolder tmpFolder = new TemporaryFolder();
     private static final int dropTableTimeout = 300; // 5 mins should be long enough.
@@ -437,50 +420,15 @@ public abstract class BaseTest {
     }
     
     private static void tearDownTxManager() throws SQLException {
-        try {
-            if (txService != null) txService.stopAndWait();
-        } finally {
-            try {
-                if (zkClient != null) zkClient.stopAndWait();
-            } finally {
-                txService = null;
-                zkClient = null;
-                txManager = null;
-            }
-        }
-        
+        TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager();
     }
-    
+
     protected static void setTxnConfigs() throws IOException {
-        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
-        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-        config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS);
-        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+        TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
     }
-    
-    protected static void setupTxManager() throws SQLException, IOException {
-        ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
-        zkClient = ZKClientServices.delegate(
-          ZKClients.reWatchOnExpire(
-            ZKClients.retryOnFailure(
-              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
-                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
-                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-                .build(),
-              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
-            )
-          )
-        );
-        zkClient.startAndWait();
 
-        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-        txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
-        txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
-        txService.startAndWait();
+    protected static void setupTxManager() throws SQLException, IOException {
+        TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl());
     }
 
     private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
@@ -499,9 +447,7 @@ public abstract class BaseTest {
     }
     
     private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException {
-        if (txService == null) {
-            setupTxManager();
-        }
+        setupTxManager();
     }
 
     /**