You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 04:59:38 UTC

[04/50] [abbrv] phoenix git commit: PHOENIX-2591 Minimize transaction commit/rollback for DDL

PHOENIX-2591 Minimize transaction commit/rollback for DDL


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f591da44
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f591da44
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f591da44

Branch: refs/heads/calcite
Commit: f591da44c9ee85ee7ab0fa910e3b18e649d86cdf
Parents: 9a44b49
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Jan 18 21:09:44 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Jan 18 21:09:44 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 30 ++++++++++----------
 .../apache/phoenix/util/TransactionUtil.java    | 23 ++++++---------
 .../java/org/apache/phoenix/util/TestUtil.java  |  2 +-
 3 files changed, 24 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f591da44/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 35a36e6..ee694e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -37,18 +37,6 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-import co.cask.tephra.visibility.FenceWait;
-import co.cask.tephra.visibility.VisibilityFence;
-
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -110,6 +98,18 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
+
 /**
  * 
  * Tracks the uncommitted state
@@ -369,9 +369,9 @@ public class MutationState implements SQLCloseable {
     	return getTransaction() != null;
     }
     
-    public long getReadPointer() {
-    	Transaction tx = getTransaction();
-    	return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getReadPointer();
+    public long getInitialWritePointer() {
+        Transaction tx = getTransaction();
+        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing
     }
     
     // For testing

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f591da44/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 9f16e52..041c12e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -19,6 +19,11 @@ package org.apache.phoenix.util;
 
 import java.sql.SQLException;
 
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -27,11 +32,6 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-
 public class TransactionUtil {
     private TransactionUtil() {
     }
@@ -67,7 +67,7 @@ public class TransactionUtil {
 	public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, long defaultResolvedTimestamp) {
 		MutationState mutationState = connection.getMutationState();
 		Long scn = connection.getSCN();
-	    return scn != null ?  scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMilliseconds(mutationState.getReadPointer()) : defaultResolvedTimestamp;
+	    return scn != null ?  scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMilliseconds(mutationState.getInitialWritePointer()) : defaultResolvedTimestamp;
 	}
 
 	public static long getResolvedTime(PhoenixConnection connection, MetaDataMutationResult result) {
@@ -80,7 +80,7 @@ public class TransactionUtil {
 		PTable table = result.getTable();
 		MutationState mutationState = connection.getMutationState();
 		boolean txInProgress = table != null && table.isTransactional() && mutationState.isTransactionStarted();
-		return  txInProgress ? convertToMilliseconds(mutationState.getReadPointer()) : result.getMutationTime();
+		return  txInProgress ? convertToMilliseconds(mutationState.getInitialWritePointer()) : result.getMutationTime();
 	}
 
 	public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional) throws SQLException {
@@ -89,17 +89,10 @@ public class TransactionUtil {
 			return timestamp;
 		}
 		MutationState mutationState = connection.getMutationState();
-		// we need to burn a txn so that we are sure the txn read pointer is close to wall clock time
 		if (!mutationState.isTransactionStarted()) {
 			mutationState.startTransaction();
-			connection.commit();
-		}
-		else {
-			connection.commit();
 		}
-		mutationState.startTransaction();
-		timestamp = convertToMilliseconds(mutationState.getReadPointer());
-		connection.commit();
+		timestamp = convertToMilliseconds(mutationState.getInitialWritePointer());
 		return timestamp;
 	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f591da44/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index e1258e4..66a3c65 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -563,7 +563,7 @@ public class TestUtil {
         String query = "UPDATE STATISTICS " + tableName;
         conn.createStatement().execute(query);
         // if the table is transactional burn a txn in order to make sure the next txn read pointer is close to wall clock time
-        TransactionUtil.getTableTimestamp(conn.unwrap(PhoenixConnection.class), transactional);
+        conn.commit();
     }
     
     public static void analyzeTableIndex(Connection conn, String tableName) throws IOException, SQLException {