You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/05/13 00:19:01 UTC

[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5233 Read-your-own writes causes incorrect visibility with transactional tables (with Omid).

This is an automated email from the ASF dual-hosted git repository.

larsh pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
     new 416f0ff  PHOENIX-5233 Read-your-own writes causes incorrect visibility with transactional tables (with Omid).
416f0ff is described below

commit 416f0ff0fc998c78b38757dfa4b9bbd76fd30c2b
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Sun May 12 17:19:09 2019 -0700

    PHOENIX-5233 Read-your-own writes causes incorrect visibility with transactional tables (with Omid).
---
 .../java/org/apache/phoenix/tx/TransactionIT.java  | 141 ++++++++++++++++++++-
 .../transaction/OmidTransactionContext.java        |  10 +-
 2 files changed, 144 insertions(+), 7 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index b8eb70b..61180ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -38,6 +38,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,6 +61,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
@@ -92,7 +97,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { 
                  {"TEPHRA"},{"OMID"}}),0);
     }
-    
+
     @Test
     public void testFailureToRollbackAfterDelete() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -133,6 +138,43 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
     }
     
     @Test
+    public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+             Connection otherConn = DriverManager.getConnection(getUrl());
+             HBaseAdmin admin = driver
+                        .getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+                        .getAdmin();) {
+            conn.setAutoCommit(true);
+            otherConn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            conn.createStatement().execute("CREATE SEQUENCE " + tableName + "_seq CACHE 1000");
+            conn.createStatement().execute("CREATE TABLE " + tableName
+                    + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000, TRANSACTIONAL=true,"
+                    + "TRANSACTION_PROVIDER='" + txProvider + "'");
+
+            conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR "
+                    + tableName + "_seq,1)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName
+                    + " SELECT NEXT VALUE FOR " + tableName + "_seq, val FROM " + tableName);
+            PreparedStatement query = otherConn
+                    .prepareStatement("SELECT COUNT(*) FROM " + tableName);
+            for (int i = 0; i < 12; i++) {
+                admin.split(TableName.valueOf(tableName));
+                int upsertCount = stmt.executeUpdate();
+                assertEquals((int) Math.pow(2, i), upsertCount);
+                ResultSet rs = query.executeQuery();
+                assertTrue(rs.next());
+                assertEquals((int) Math.pow(2, i + 1), rs.getLong(1));
+                rs.close();
+            }
+        }
+    }
+
+    @Test
     public void testWithMixOfTxProviders() throws Exception {
         // No sense in running the test with every providers, so just run it with the default one
         if (!TransactionFactory.Provider.valueOf(txProvider).equals(TransactionFactory.Provider.getDefault())) {
@@ -546,4 +588,101 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             }
         }
     }
+
+    private class ParallelQuery implements Runnable {
+        PreparedStatement query;
+        CountDownLatch started = new CountDownLatch(1);
+        AtomicBoolean done = new AtomicBoolean(false);
+        ConcurrentHashMap<Long, Long> failCounts = new ConcurrentHashMap<>();
+
+        public ParallelQuery(PreparedStatement query) {
+            this.query = query;
+        }
+
+        @Override
+        public void run() {
+            try {
+                started.countDown();
+                while(!done.get()) {
+                    ResultSet rs = query.executeQuery();
+                    assertTrue(rs.next());
+                    long count = rs.getLong(1);
+                    rs.close();
+                    if (count != 0 && count != (int)Math.pow(2, 12)) {
+                        failCounts.put(count, count);
+                    }
+                }
+            } catch (SQLException x) {
+                throw new RuntimeException(x);
+            }
+        }
+    }
+
+    @Test
+    public void testParallelConnectionOnlySeesCommittedData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+             Connection otherConn = DriverManager.getConnection(getUrl());
+             HBaseAdmin admin = driver
+                        .getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+                        .getAdmin();) {
+            conn.setAutoCommit(false);
+            otherConn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            conn.createStatement().execute("CREATE SEQUENCE " + tableName + "_seq CACHE 1000");
+            conn.createStatement().execute("CREATE TABLE " + tableName
+                    + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000, TRANSACTIONAL=true,TRANSACTION_PROVIDER='"
+                    + txProvider + "'");
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName
+                    + " SELECT NEXT VALUE FOR " + tableName + "_seq, val FROM " + tableName);
+            PreparedStatement seed = conn.prepareStatement("UPSERT INTO " + tableName
+                    + " VALUES (NEXT VALUE FOR " + tableName + "_seq,1)");
+
+            PreparedStatement query = conn.prepareStatement("SELECT COUNT(*) FROM " + tableName);
+            PreparedStatement otherQuery = otherConn
+                    .prepareStatement("SELECT COUNT(*) FROM " + tableName);
+
+            // seed
+            seed.executeUpdate();
+            for (int i = 0; i < 12; i++) {
+                admin.split(TableName.valueOf(tableName));
+                int upsertCount = stmt.executeUpdate();
+                assertEquals((int) Math.pow(2, i), upsertCount);
+
+                // read-own-writes, this forces uncommitted data to the server
+                ResultSet rs = query.executeQuery();
+                assertTrue(rs.next());
+                assertEquals((int) Math.pow(2, i + 1), rs.getLong(1));
+                rs.close();
+
+                rs = otherQuery.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(0, rs.getLong(1));
+                rs.close();
+            }
+
+            ParallelQuery q = new ParallelQuery(otherQuery);
+            Thread t = new Thread(q);
+            t.start();
+            q.started.await();
+
+            conn.commit();
+
+            q.done.set(true);
+            t.join();
+
+            assertTrue(
+                    "Expected 0 or 4096 but got these intermediary counts " + q.failCounts.keySet(),
+                    q.failCounts.isEmpty());
+            ResultSet rs = otherQuery.executeQuery();
+            assertTrue(rs.next());
+            assertEquals((int) Math.pow(2, 12), rs.getLong(1));
+            rs.close();
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 392de78..1147e07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -153,12 +153,10 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
 
     @Override
     public void checkpoint(boolean hasUncommittedData) throws SQLException {
-        if (hasUncommittedData) {
-            try {
-                tx.checkpoint();
-            } catch (TransactionException e) {
-                throw new SQLException(e);
-            }
+        try {
+            tx.checkpoint();
+        } catch (TransactionException e) {
+            throw new SQLException(e);
         }
         tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
     }