You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/12/21 14:53:19 UTC
[phoenix] branch master updated: PHOENIX-6274 : Deflake
TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 1dcb749 PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion
1dcb749 is described below
commit 1dcb74998589a9906973f208f7fc698814574639
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sat Dec 19 17:34:26 2020 +0530
PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion
---
.../end2end/TableSnapshotReadsMapReduceIT.java | 145 +++++++++++++++++----
1 file changed, 123 insertions(+), 22 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 2f2a188..e493b17 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -33,15 +33,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import java.util.UUID;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -79,9 +83,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
private static final String FIELD1 = "FIELD1";
private static final String FIELD2 = "FIELD2";
private static final String FIELD3 = "FIELD3";
- private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+ private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
" FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
- private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+ private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
private static final String CREATE_STOCK_TABLE =
"CREATE TABLE IF NOT EXISTS %s ( " + STOCK_NAME + " VARCHAR NOT NULL , " + RECORDING_YEAR
+ " INTEGER NOT NULL, " + RECORDINGS_QUARTER + " "
@@ -96,11 +100,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
private Job job;
private Path tmpDir;
private Configuration conf;
+ private static final Random RANDOM = new Random();
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ getUtility().getAdmin().balancerSwitch(false, true);
}
@Before
@@ -268,7 +274,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next());
} finally {
- deleteSnapshot(tableName);
+ deleteSnapshotIfExists(SNAPSHOT_NAME);
}
}
@@ -300,6 +306,26 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
}
+ private void upsertDataBeforeSplit(String tableName) throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+ upsertData(stmt, "CCCC", "SSDD", RANDOM.nextInt());
+ for (int i = 0; i < 100; i++) {
+ upsertData(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt());
+ upsertData(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt());
+ upsertData(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt());
+ upsertData(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt());
+ upsertData(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt());
+ upsertData(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt());
+ upsertData(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt());
+ upsertData(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt());
+ upsertData(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt());
+ upsertData(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt());
+ upsertData(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt());
+ }
+ conn.commit();
+ }
+
private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
stmt.setString(1, field1);
stmt.setString(2, field2);
@@ -308,17 +334,23 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
}
private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception {
- upsertData(tableName);
+ if (shouldSplit) {
+ // having very few rows in table doesn't really help much with splitting case.
+ // we should upsert large no of rows as a prerequisite to splitting
+ upsertDataBeforeSplit(tableName);
+ } else {
+ upsertData(tableName);
+ }
TableName hbaseTableName = TableName.valueOf(tableName);
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
if (shouldSplit) {
- splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
+ splitTableSync(admin, hbaseTableName, Bytes.toBytes("CCCC"), 2);
}
- admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
+ snapshotCreateSync(hbaseTableName, admin, SNAPSHOT_NAME);
List<SnapshotDescription> snapshots = admin.listSnapshots();
Assert.assertEquals(tableName, snapshots.get(0).getTable());
@@ -334,26 +366,95 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
}
}
- private void splitTableSync(Admin admin, TableName hbaseTableName,
- byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException {
- admin.split(hbaseTableName, splitPoint);
- for (int i = 0; i < 100; i++) {
- List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName);
- if (hRegionInfoList.size() >= expectedRegions) {
+ private void snapshotCreateSync(TableName hbaseTableName,
+ Admin admin, String snapshotName) throws IOException, InterruptedException {
+ boolean isSnapshotCreated = false;
+ SnapshotDescription snapshotDescription =
+ new SnapshotDescription(snapshotName);
+ // 3 retries while creating snapshot. if all 3 retries exhausted, we have
+ // some valid issue.
+ for (int i = 0; i < 3; i++) {
+ if (isSnapshotCreated) {
break;
}
- LOGGER.info("Sleeping for 1000 ms while waiting for "
- + hbaseTableName.getNameAsString() + " to split");
+ if (i > 0) {
+ LOGGER.info("Retry count {} for snapshot creation", i);
+ }
+ try {
+ admin.snapshot(snapshotName, hbaseTableName);
+ } catch (Exception e) {
+ LOGGER.info("Snapshot creation failure for {}", snapshotName, e);
+ continue;
+ }
+ // verify if snapshot was created in 10s
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(1000);
+ try {
+ if (admin.isSnapshotFinished(snapshotDescription)) {
+ isSnapshotCreated = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Snapshot creation failed.", e);
+ break;
+ }
+ }
+ }
+ if (!isSnapshotCreated) {
+ throw new IOException("Snapshot creation failed for " + snapshotName);
+ }
+ }
+
+ private void splitTableSync(Admin admin, TableName hbaseTableName,
+ byte[] splitPoint, int expectedRegions) throws IOException,
+ InterruptedException {
+ admin.split(hbaseTableName, splitPoint);
+ AssignmentManager assignmentManager =
+ getUtility().getHBaseCluster().getMaster().getAssignmentManager();
+ // wait for split daughter regions coming online for ~20s
+ for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
+ List<HRegion> regions = getUtility().getHBaseCluster()
+ .getRegions(hbaseTableName);
+ if (regions.size() >= expectedRegions) {
+ boolean allRegionsOnline = true;
+ for (HRegion region : regions) {
+ if (!assignmentManager.getRegionStates()
+ .isRegionOnline(region.getRegionInfo())) {
+ allRegionsOnline = false;
+ break;
+ }
+ }
+ if (allRegionsOnline) {
+ break;
+ }
+ }
+ LOGGER.info("Sleeping for 1000 ms while waiting for {} to split and all regions to come online",
+ hbaseTableName.getNameAsString());
}
}
- private void deleteSnapshot(String tableName) throws Exception {
- try (Connection conn = DriverManager.getConnection(getUrl());
- Admin admin =
- conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
- admin.deleteSnapshot(SNAPSHOT_NAME);
+ private void deleteSnapshotIfExists(String snapshotName) throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+ List<SnapshotDescription> snapshotDescriptions = admin.listSnapshots();
+ boolean isSnapshotPresent = false;
+ if (CollectionUtils.isNotEmpty(snapshotDescriptions)) {
+ for (SnapshotDescription snapshotDescription : snapshotDescriptions) {
+ if (snapshotName.equals(snapshotDescription.getName())) {
+ isSnapshotPresent = true;
+ break;
+ }
}
+ }
+ // delete snapshot only if exists and it is not corrupted
+ if (isSnapshotPresent) {
+ admin.deleteSnapshot(snapshotName);
+ } else {
+ LOGGER.info("Snapshot {} does not exist. Possibly corrupted due to region movements.",
+ snapshotName);
+ }
+ }
}
public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {