You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:34:35 UTC

[07/40] phoenix git commit: PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)

PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/798ebb4a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/798ebb4a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/798ebb4a

Branch: refs/heads/4.x-HBase-1.2
Commit: 798ebb4a7037009e57bca771a436e1135e39e0c1
Parents: a918955
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 29 15:20:23 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:02:13 2017 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/IndexScrutinyToolIT.java    | 101 ++++++++++++++++++-
 1 file changed, 96 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/798ebb4a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 10595a7..cbce7b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -36,6 +36,9 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.io.IOUtils;
@@ -43,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
@@ -103,6 +107,7 @@ public class IndexScrutinyToolIT extends BaseTest {
     private PreparedStatement indexTableUpsertStmt;
 
     private long testTime;
+    private Properties props;
 
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
@@ -120,8 +125,11 @@ public class IndexScrutinyToolIT extends BaseTest {
 
     @BeforeClass
     public static void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMap();
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        Map<String, String> serverProps = Maps.newHashMap();
+        //disable major compactions
+        serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+        Map<String, String> clientProps = Maps.newHashMap();
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
     /**
@@ -133,7 +141,7 @@ public class IndexScrutinyToolIT extends BaseTest {
         createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
         createTestTable(getUrl(),
             String.format(indexTableDdl, indexTableName, dataTableFullName));
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         conn = DriverManager.getConnection(getUrl(), props);
         String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
         dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
@@ -141,6 +149,7 @@ public class IndexScrutinyToolIT extends BaseTest {
         indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
         conn.setAutoCommit(false);
         testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
     }
 
     @After
@@ -177,6 +186,77 @@ public class IndexScrutinyToolIT extends BaseTest {
     }
 
     /**
+     * Tests running a scrutiny while updates and deletes are happening.
+     * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+     */
+    @Test
+    public void testScrutinyWhileTakingWrites() throws Exception {
+        int id = 0;
+        while (id < 1000) {
+            int index = 1;
+            dataTableUpsertStmt.setInt(index++, id);
+            dataTableUpsertStmt.setString(index++, "name-" + id);
+            dataTableUpsertStmt.setInt(index++, id);
+            dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+            dataTableUpsertStmt.executeUpdate();
+            id++;
+        }
+        conn.commit();
+
+        //CURRENT_SCN for scrutiny
+        long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+        // launch background upserts and deletes
+        final Random random = new Random(0);
+        Runnable backgroundUpserts = new Runnable() {
+            @Override
+            public void run() {
+                int idToUpsert = random.nextInt(1000);
+                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                    PreparedStatement dataPS =
+                            conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+                    upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+                    conn.commit();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        Runnable backgroundDeletes = new Runnable() {
+            @Override
+            public void run() {
+                int idToDelete = random.nextInt(1000);
+                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                    String deleteSql =
+                            String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
+                                    + idToDelete;
+                    conn.createStatement().executeUpdate(deleteSql);
+                    conn.commit();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+        scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
+            TimeUnit.MILLISECONDS);
+        scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
+            TimeUnit.MILLISECONDS);
+
+        // scrutiny should report everything as ok
+        List<Job> completedJobs =
+                runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
+                    scrutinyTS);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+        scheduledThreadPool.shutdown();
+        scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+    }
+
+    /**
      * Tests an index with the same # of rows as the data table, but one of the index rows is
      * incorrect Scrutiny should report the invalid rows.
      */
@@ -570,6 +650,13 @@ public class IndexScrutinyToolIT extends BaseTest {
     private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
             SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
             Long maxOutputRows) {
+        return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+            outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
+    }
+
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
+            Long maxOutputRows, Long scrutinyTs) {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -591,7 +678,7 @@ public class IndexScrutinyToolIT extends BaseTest {
             args.add(outputDir);
         }
         args.add("-t");
-        args.add(String.valueOf(Long.MAX_VALUE));
+        args.add(String.valueOf(scrutinyTs));
         args.add("-run-foreground");
         if (batchSize != null) {
             args.add("-b");
@@ -619,6 +706,10 @@ public class IndexScrutinyToolIT extends BaseTest {
         return args.toArray(new String[0]);
     }
 
+    private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+        return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
+    }
+
     private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
         return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
     }
@@ -632,7 +723,7 @@ public class IndexScrutinyToolIT extends BaseTest {
             Long batchSize, SourceTable sourceTable) throws Exception {
         final String[] cmdArgs =
                 getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
-                    null, null);
+                    null, null, Long.MAX_VALUE);
         return runScrutiny(cmdArgs);
     }