You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 20:13:28 UTC

phoenix git commit: Add transaction logic for Delete (with failing test for now)

Repository: phoenix
Updated Branches:
  refs/heads/txn 9270db150 -> ea523e7c5


Add transaction logic for Delete (with failing test for now)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ea523e7c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ea523e7c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ea523e7c

Branch: refs/heads/txn
Commit: ea523e7c5d3a788838974ae581cde74c7b10de75
Parents: 9270db1
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Apr 20 11:13:23 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Apr 20 11:13:23 2015 -0700

----------------------------------------------------------------------
 .../phoenix/transactions/TransactionIT.java     | 46 ++++++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  | 26 +++++++----
 2 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 31adcb9..8babaae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -141,6 +141,52 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         }
 	}
 	
+    @Test
+    public void testDelete() throws Exception {
+        String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        Connection conn2 = DriverManager.getConnection(getUrl());
+        try {
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsert);
+            // upsert two rows
+            setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn1.commit();
+            
+            setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + FULL_TABLE_NAME);
+            assertEquals(2, rowsDeleted);
+            
+            // Delete and second upsert not committed yet, so there should be one row.
+            // FIXME: aggregate queries don't appear to honor the transaction information
+            // rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            // FIXME: (see above)
+            // assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            conn1.commit();
+            
+            // verify rows are deleted after commit
+            // FIXME: this is failing, I think because Tephra isn't handling deletes like we need it to
+            // TODO: confirm this works once we get the patch from Gary.
+            rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+        }
+        finally {
+            conn1.close();
+        }
+    }
+    
 	@Test
 	public void testAutoCommitQuerySingleTable() throws Exception {
 		Connection conn = DriverManager.getConnection(getUrl());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 0778f75..408c622 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -291,12 +291,13 @@ public class DeleteCompiler {
         boolean noQueryReqd = false;
         boolean runOnServer = false;
         SelectStatement select = null;
+        ColumnResolver resolverToBe = null;
         Set<PTable> immutableIndex = Collections.emptySet();
         DeletingParallelIteratorFactory parallelIteratorFactory = null;
         while (true) {
             try {
-                ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
-                tableRefToBe = resolver.getTables().get(0);
+                resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
+                tableRefToBe = resolverToBe.getTables().get(0);
                 PTable table = tableRefToBe.getTable();
                 // Cannot update:
                 // - read-only VIEW 
@@ -332,17 +333,17 @@ public class DeleteCompiler {
                         Collections.<ParseNode>emptyList(), null, 
                         delete.getOrderBy(), delete.getLimit(),
                         delete.getBindCount(), false, false);
-                select = StatementNormalizer.normalize(select, resolver);
-                SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection);
+                select = StatementNormalizer.normalize(select, resolverToBe);
+                SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
                 if (transformedSelect != select) {
-                    resolver = FromCompiler.getResolverForQuery(transformedSelect, connection);
-                    select = StatementNormalizer.normalize(transformedSelect, resolver);
+                    resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection);
+                    select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
                 }
                 parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection);
                 QueryOptimizer optimizer = new QueryOptimizer(services);
                 queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
-                        ? optimizer.getApplicablePlans(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory)
-                        : optimizer.getBestPlan(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory));
+                        ? optimizer.getApplicablePlans(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
+                        : optimizer.getBestPlan(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
                 if (mayHaveImmutableIndexes) { // FIXME: this is ugly
                     // Lookup the table being deleted from in the cache, as it's possible that the
                     // optimizer updated the cache if it found indexes that were out of date.
@@ -367,6 +368,7 @@ public class DeleteCompiler {
             }
             break;
         }
+        final ColumnResolver resolver = resolverToBe;
         final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
         // tableRefs is parallel with queryPlans
         TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1];
@@ -559,6 +561,14 @@ public class DeleteCompiler {
     
                     @Override
                     public MutationState execute() throws SQLException {
+                        // Repeated from PhoenixStatement.executeQuery which this call bypasses.
+                        // Send mutations to hbase, so they are visible to subsequent reads.
+                        // Use original plan for data table so that data and immutable indexes will be sent.
+                        boolean isTransactional = connection.getMutationState().startTransaction(resolver.getTables().iterator());
+                        if (isTransactional) {
+                            // Use real query plan  so that we have the right context object.
+                            plan.getContext().setTransaction(connection.getMutationState().getTransaction());
+                        }
                         ResultIterator iterator = plan.iterator();
                         if (!hasLimit) {
                             Tuple tuple;