You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/12/21 14:53:19 UTC

[phoenix] branch master updated: PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dcb749  PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion
1dcb749 is described below

commit 1dcb74998589a9906973f208f7fc698814574639
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sat Dec 19 17:34:26 2020 +0530

    PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion
---
 .../end2end/TableSnapshotReadsMapReduceIT.java     | 145 +++++++++++++++++----
 1 file changed, 123 insertions(+), 22 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 2f2a188..e493b17 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -33,15 +33,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.UUID;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -79,9 +83,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
-  private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+  private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
-  private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
   private static final String CREATE_STOCK_TABLE =
           "CREATE TABLE IF NOT EXISTS %s ( " + STOCK_NAME + " VARCHAR NOT NULL , " + RECORDING_YEAR
                   + "  INTEGER NOT  NULL,  " + RECORDINGS_QUARTER + " "
@@ -96,11 +100,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
   private Job job;
   private Path tmpDir;
   private Configuration conf;
+  private static final Random RANDOM = new Random();
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
       Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
       setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+      getUtility().getAdmin().balancerSwitch(false, true);
   }
 
   @Before
@@ -268,7 +274,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
 
       assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next());
     } finally {
-      deleteSnapshot(tableName);
+      deleteSnapshotIfExists(SNAPSHOT_NAME);
     }
   }
 
@@ -300,6 +306,26 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
     conn.commit();
   }
 
+  private void upsertDataBeforeSplit(String tableName) throws SQLException {
+    Connection conn = DriverManager.getConnection(getUrl());
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+    upsertData(stmt, "CCCC", "SSDD", RANDOM.nextInt());
+    for (int i = 0; i < 100; i++) {
+      upsertData(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt());
+      upsertData(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt());
+      upsertData(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt());
+      upsertData(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt());
+      upsertData(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt());
+      upsertData(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt());
+      upsertData(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt());
+      upsertData(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt());
+      upsertData(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt());
+      upsertData(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt());
+      upsertData(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt());
+    }
+    conn.commit();
+  }
+
   private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
     stmt.setString(1, field1);
     stmt.setString(2, field2);
@@ -308,17 +334,23 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
   }
 
   private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception {
-    upsertData(tableName);
+    if (shouldSplit) {
+      // having very few rows in table doesn't really help much with splitting case.
+      // we should upsert large no of rows as a prerequisite to splitting
+      upsertDataBeforeSplit(tableName);
+    } else {
+      upsertData(tableName);
+    }
 
     TableName hbaseTableName = TableName.valueOf(tableName);
-    try (Connection conn = DriverManager.getConnection(getUrl())) {
-      Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+    try (Connection conn = DriverManager.getConnection(getUrl());
+         Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
 
       if (shouldSplit) {
-        splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
+        splitTableSync(admin, hbaseTableName, Bytes.toBytes("CCCC"), 2);
       }
 
-      admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
+      snapshotCreateSync(hbaseTableName, admin, SNAPSHOT_NAME);
 
       List<SnapshotDescription> snapshots = admin.listSnapshots();
       Assert.assertEquals(tableName, snapshots.get(0).getTable());
@@ -334,26 +366,95 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
     }
   }
 
-  private void splitTableSync(Admin admin, TableName hbaseTableName,
-                              byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException {
-    admin.split(hbaseTableName, splitPoint);
-    for (int i = 0; i < 100; i++) {
-      List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName);
-      if (hRegionInfoList.size() >= expectedRegions) {
+  private void snapshotCreateSync(TableName hbaseTableName,
+      Admin admin, String snapshotName) throws IOException, InterruptedException {
+    boolean isSnapshotCreated = false;
+    SnapshotDescription snapshotDescription =
+      new SnapshotDescription(snapshotName);
+    // 3 retries while creating snapshot. if all 3 retries exhausted, we have
+    // some valid issue.
+    for (int i = 0; i < 3; i++) {
+      if (isSnapshotCreated) {
         break;
       }
-      LOGGER.info("Sleeping for 1000 ms while waiting for "
-              + hbaseTableName.getNameAsString() + " to split");
+      if (i > 0) {
+        LOGGER.info("Retry count {} for snapshot creation", i);
+      }
+      try {
+        admin.snapshot(snapshotName, hbaseTableName);
+      } catch (Exception e) {
+        LOGGER.info("Snapshot creation failure for {}", snapshotName, e);
+        continue;
+      }
+      // verify if snapshot was created in 10s
+      for (int j = 0; j < 10; j++) {
+        Thread.sleep(1000);
+        try {
+          if (admin.isSnapshotFinished(snapshotDescription)) {
+            isSnapshotCreated = true;
+            break;
+          }
+        } catch (Exception e) {
+          LOGGER.error("Snapshot creation failed.", e);
+          break;
+        }
+      }
+    }
+    if (!isSnapshotCreated) {
+      throw new IOException("Snapshot creation failed for " + snapshotName);
+    }
+  }
+
+  private void splitTableSync(Admin admin, TableName hbaseTableName,
+      byte[] splitPoint, int expectedRegions) throws IOException,
+      InterruptedException {
+    admin.split(hbaseTableName, splitPoint);
+    AssignmentManager assignmentManager =
+      getUtility().getHBaseCluster().getMaster().getAssignmentManager();
+    // wait for split daughter regions coming online for ~20s
+    for (int i = 0; i < 20; i++) {
       Thread.sleep(1000);
+      List<HRegion> regions = getUtility().getHBaseCluster()
+        .getRegions(hbaseTableName);
+      if (regions.size() >= expectedRegions) {
+        boolean allRegionsOnline = true;
+        for (HRegion region : regions) {
+          if (!assignmentManager.getRegionStates()
+              .isRegionOnline(region.getRegionInfo())) {
+            allRegionsOnline = false;
+            break;
+          }
+        }
+        if (allRegionsOnline) {
+          break;
+        }
+      }
+      LOGGER.info("Sleeping for 1000 ms while waiting for {} to split and all regions to come online",
+        hbaseTableName.getNameAsString());
     }
   }
 
-  private void deleteSnapshot(String tableName) throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl());
-                Admin admin =
-                        conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
-            admin.deleteSnapshot(SNAPSHOT_NAME);
+  private void deleteSnapshotIfExists(String snapshotName) throws Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl());
+         Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+      List<SnapshotDescription> snapshotDescriptions = admin.listSnapshots();
+      boolean isSnapshotPresent = false;
+      if (CollectionUtils.isNotEmpty(snapshotDescriptions)) {
+        for (SnapshotDescription snapshotDescription : snapshotDescriptions) {
+          if (snapshotName.equals(snapshotDescription.getName())) {
+            isSnapshotPresent = true;
+            break;
+          }
         }
+      }
+      // delete snapshot only if exists and it is not corrupted
+      if (isSnapshotPresent) {
+        admin.deleteSnapshot(snapshotName);
+      } else {
+        LOGGER.info("Snapshot {} does not exist. Possibly corrupted due to region movements.",
+          snapshotName);
+      }
+    }
   }
 
   public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {