You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/05/13 00:19:01 UTC
[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5233 Read-your-own
writes causes incorrect visibility with transactional tables (with Omid).
This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 416f0ff PHOENIX-5233 Read-your-own writes causes incorrect visibility with transactional tables (with Omid).
416f0ff is described below
commit 416f0ff0fc998c78b38757dfa4b9bbd76fd30c2b
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Sun May 12 17:19:09 2019 -0700
PHOENIX-5233 Read-your-own writes causes incorrect visibility with transactional tables (with Omid).
---
.../java/org/apache/phoenix/tx/TransactionIT.java | 141 ++++++++++++++++++++-
.../transaction/OmidTransactionContext.java | 10 +-
2 files changed, 144 insertions(+), 7 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index b8eb70b..61180ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -38,6 +38,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,6 +61,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
@@ -92,7 +97,7 @@ public class TransactionIT extends ParallelStatsDisabledIT {
return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {
{"TEPHRA"},{"OMID"}}),0);
}
-
+
@Test
public void testFailureToRollbackAfterDelete() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -133,6 +138,43 @@ public class TransactionIT extends ParallelStatsDisabledIT {
}
@Test
+ public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection otherConn = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = driver
+ .getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+ .getAdmin();) {
+ conn.setAutoCommit(true);
+ otherConn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE SEQUENCE " + tableName + "_seq CACHE 1000");
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000, TRANSACTIONAL=true,"
+ + "TRANSACTION_PROVIDER='" + txProvider + "'");
+
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR "
+ + tableName + "_seq,1)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " SELECT NEXT VALUE FOR " + tableName + "_seq, val FROM " + tableName);
+ PreparedStatement query = otherConn
+ .prepareStatement("SELECT COUNT(*) FROM " + tableName);
+ for (int i = 0; i < 12; i++) {
+ admin.split(TableName.valueOf(tableName));
+ int upsertCount = stmt.executeUpdate();
+ assertEquals((int) Math.pow(2, i), upsertCount);
+ ResultSet rs = query.executeQuery();
+ assertTrue(rs.next());
+ assertEquals((int) Math.pow(2, i + 1), rs.getLong(1));
+ rs.close();
+ }
+ }
+ }
+
+ @Test
public void testWithMixOfTxProviders() throws Exception {
// No sense in running the test with every providers, so just run it with the default one
if (!TransactionFactory.Provider.valueOf(txProvider).equals(TransactionFactory.Provider.getDefault())) {
@@ -546,4 +588,101 @@ public class TransactionIT extends ParallelStatsDisabledIT {
}
}
}
+
+ private class ParallelQuery implements Runnable {
+ PreparedStatement query;
+ CountDownLatch started = new CountDownLatch(1);
+ AtomicBoolean done = new AtomicBoolean(false);
+ ConcurrentHashMap<Long, Long> failCounts = new ConcurrentHashMap<>();
+
+ public ParallelQuery(PreparedStatement query) {
+ this.query = query;
+ }
+
+ @Override
+ public void run() {
+ try {
+ started.countDown();
+ while(!done.get()) {
+ ResultSet rs = query.executeQuery();
+ assertTrue(rs.next());
+ long count = rs.getLong(1);
+ rs.close();
+ if (count != 0 && count != (int)Math.pow(2, 12)) {
+ failCounts.put(count, count);
+ }
+ }
+ } catch (SQLException x) {
+ throw new RuntimeException(x);
+ }
+ }
+ }
+
+ @Test
+ public void testParallelConnectionOnlySeesCommittedData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection otherConn = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = driver
+ .getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+ .getAdmin();) {
+ conn.setAutoCommit(false);
+ otherConn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE SEQUENCE " + tableName + "_seq CACHE 1000");
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000, TRANSACTIONAL=true,TRANSACTION_PROVIDER='"
+ + txProvider + "'");
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " SELECT NEXT VALUE FOR " + tableName + "_seq, val FROM " + tableName);
+ PreparedStatement seed = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (NEXT VALUE FOR " + tableName + "_seq,1)");
+
+ PreparedStatement query = conn.prepareStatement("SELECT COUNT(*) FROM " + tableName);
+ PreparedStatement otherQuery = otherConn
+ .prepareStatement("SELECT COUNT(*) FROM " + tableName);
+
+ // seed
+ seed.executeUpdate();
+ for (int i = 0; i < 12; i++) {
+ admin.split(TableName.valueOf(tableName));
+ int upsertCount = stmt.executeUpdate();
+ assertEquals((int) Math.pow(2, i), upsertCount);
+
+ // read-own-writes, this forces uncommitted data to the server
+ ResultSet rs = query.executeQuery();
+ assertTrue(rs.next());
+ assertEquals((int) Math.pow(2, i + 1), rs.getLong(1));
+ rs.close();
+
+ rs = otherQuery.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(0, rs.getLong(1));
+ rs.close();
+ }
+
+ ParallelQuery q = new ParallelQuery(otherQuery);
+ Thread t = new Thread(q);
+ t.start();
+ q.started.await();
+
+ conn.commit();
+
+ q.done.set(true);
+ t.join();
+
+ assertTrue(
+ "Expected 0 or 4096 but got these intermediary counts " + q.failCounts.keySet(),
+ q.failCounts.isEmpty());
+ ResultSet rs = otherQuery.executeQuery();
+ assertTrue(rs.next());
+ assertEquals((int) Math.pow(2, 12), rs.getLong(1));
+ rs.close();
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 392de78..1147e07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -153,12 +153,10 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
@Override
public void checkpoint(boolean hasUncommittedData) throws SQLException {
- if (hasUncommittedData) {
- try {
- tx.checkpoint();
- } catch (TransactionException e) {
- throw new SQLException(e);
- }
+ try {
+ tx.checkpoint();
+ } catch (TransactionException e) {
+ throw new SQLException(e);
}
tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
}