You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 20:13:28 UTC
phoenix git commit: Add transaction logic for Delete (with failing
test for now)
Repository: phoenix
Updated Branches:
refs/heads/txn 9270db150 -> ea523e7c5
Add transaction logic for Delete (with failing test for now)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ea523e7c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ea523e7c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ea523e7c
Branch: refs/heads/txn
Commit: ea523e7c5d3a788838974ae581cde74c7b10de75
Parents: 9270db1
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Apr 20 11:13:23 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Apr 20 11:13:23 2015 -0700
----------------------------------------------------------------------
.../phoenix/transactions/TransactionIT.java | 46 ++++++++++++++++++++
.../apache/phoenix/compile/DeleteCompiler.java | 26 +++++++----
2 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 31adcb9..8babaae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -141,6 +141,52 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testDelete() throws Exception {
+ String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl());
+ try {
+ conn1.setAutoCommit(false);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn1.commit();
+
+ setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ // verify rows can be read even though commit has not been called
+ int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + FULL_TABLE_NAME);
+ assertEquals(2, rowsDeleted);
+
+ // Delete and second upsert not committed yet, so there should be one row.
+ // FIXME: aggregate queries don't appear to honor the transaction information
+ // rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ // FIXME: (see above)
+ // assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ conn1.commit();
+
+ // verify rows are deleted after commit
+ // FIXME: this is failing, I think because Tephra isn't handling deletes like we need it to
+ // TODO: confirm this works once we get the patch from Gary.
+ rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+ }
+ finally {
+ conn1.close();
+ }
+ }
+
@Test
public void testAutoCommitQuerySingleTable() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 0778f75..408c622 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -291,12 +291,13 @@ public class DeleteCompiler {
boolean noQueryReqd = false;
boolean runOnServer = false;
SelectStatement select = null;
+ ColumnResolver resolverToBe = null;
Set<PTable> immutableIndex = Collections.emptySet();
DeletingParallelIteratorFactory parallelIteratorFactory = null;
while (true) {
try {
- ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
- tableRefToBe = resolver.getTables().get(0);
+ resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
+ tableRefToBe = resolverToBe.getTables().get(0);
PTable table = tableRefToBe.getTable();
// Cannot update:
// - read-only VIEW
@@ -332,17 +333,17 @@ public class DeleteCompiler {
Collections.<ParseNode>emptyList(), null,
delete.getOrderBy(), delete.getLimit(),
delete.getBindCount(), false, false);
- select = StatementNormalizer.normalize(select, resolver);
- SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection);
+ select = StatementNormalizer.normalize(select, resolverToBe);
+ SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
if (transformedSelect != select) {
- resolver = FromCompiler.getResolverForQuery(transformedSelect, connection);
- select = StatementNormalizer.normalize(transformedSelect, resolver);
+ resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection);
+ select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
}
parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection);
QueryOptimizer optimizer = new QueryOptimizer(services);
queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
- ? optimizer.getApplicablePlans(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory)
- : optimizer.getBestPlan(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory));
+ ? optimizer.getApplicablePlans(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
+ : optimizer.getBestPlan(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
if (mayHaveImmutableIndexes) { // FIXME: this is ugly
// Lookup the table being deleted from in the cache, as it's possible that the
// optimizer updated the cache if it found indexes that were out of date.
@@ -367,6 +368,7 @@ public class DeleteCompiler {
}
break;
}
+ final ColumnResolver resolver = resolverToBe;
final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
// tableRefs is parallel with queryPlans
TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1];
@@ -559,6 +561,14 @@ public class DeleteCompiler {
@Override
public MutationState execute() throws SQLException {
+ // Repeated from PhoenixStatement.executeQuery which this call bypasses.
+ // Send mutations to hbase, so they are visible to subsequent reads.
+ // Use original plan for data table so that data and immutable indexes will be sent.
+ boolean isTransactional = connection.getMutationState().startTransaction(resolver.getTables().iterator());
+ if (isTransactional) {
+ // Use real query plan so that we have the right context object.
+ plan.getContext().setTransaction(connection.getMutationState().getTransaction());
+ }
ResultIterator iterator = plan.iterator();
if (!hasLimit) {
Tuple tuple;