You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2018/11/02 00:53:31 UTC
phoenix git commit: PHOENIX-4997 Phoenix MR on snapshots can produce
duplicate rows
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.4 bbd31a9e0 -> 8ccf69f00
PHOENIX-4997 Phoenix MR on snapshots can produce duplicate rows
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ccf69f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ccf69f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ccf69f0
Branch: refs/heads/4.x-HBase-1.4
Commit: 8ccf69f00d318873ddf0e77e73bf8e5045fdb3c4
Parents: bbd31a9
Author: Karan Mehta <ka...@gmail.com>
Authored: Thu Nov 1 17:15:26 2018 -0700
Committer: Karan Mehta <ka...@gmail.com>
Committed: Thu Nov 1 17:49:55 2018 -0700
----------------------------------------------------------------------
.../end2end/TableSnapshotReadsMapReduceIT.java | 122 +++++++++++--------
.../iterate/MapReduceParallelScanGrouper.java | 32 ++++-
.../iterate/TableSnapshotResultIterator.java | 28 +++--
.../java/org/apache/phoenix/query/BaseTest.java | 14 +--
4 files changed, 122 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index cae91a3..e35e159 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -36,6 +36,7 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -49,12 +50,18 @@ import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
+
+ private static final Logger logger = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
+
private final static String SNAPSHOT_NAME = "FOO";
private static final String FIELD1 = "FIELD1";
private static final String FIELD2 = "FIELD2";
@@ -66,6 +73,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
private static List<List<Object>> result;
private long timestamp;
private String tableName;
+ private Job job;
+ private Path tmpDir;
+ private Configuration conf;
@BeforeClass
public static void doSetup() throws Exception {
@@ -73,8 +83,8 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
- @Test
- public void testMapReduceSnapshots() throws Exception {
+ @Before
+ public void before() throws SQLException, IOException {
// create table
Connection conn = DriverManager.getConnection(getUrl());
tableName = generateUniqueName();
@@ -82,58 +92,43 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
// configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
+ conf = getUtility().getConfiguration();
+ job = Job.getInstance(conf);
+ tmpDir = getUtility().getRandomDir();
+ }
- PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3);
+ @Test
+ public void testMapReduceSnapshots() throws Exception {
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+ SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3);
+ configureJob(job, tableName, null, null, false);
+ }
- // configure and test job
- configureJob(job, tableName, null, null);
+ @Test
+ public void testMapReduceSnapshotsMultiRegion() throws Exception {
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+ SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3);
+ configureJob(job, tableName, null, null, true);
}
@Test
public void testMapReduceSnapshotsWithCondition() throws Exception {
- // create table
- Connection conn = DriverManager.getConnection(getUrl());
- tableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
- conn.commit();
-
- // configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
- PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
-
- // configure and test job
- configureJob(job, tableName, null, "FIELD3 > 0001");
-
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+ SNAPSHOT_NAME, tableName, tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
+ configureJob(job, tableName, null, "FIELD3 > 0001", false);
}
@Test
public void testMapReduceSnapshotWithLimit() throws Exception {
- // create table
- Connection conn = DriverManager.getConnection(getUrl());
- tableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
- conn.commit();
-
- // configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
- // Running limit with order by on non pk column
String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
- PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery);
-
- // configure and test job
- configureJob(job, tableName, inputQuery, null);
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+ SNAPSHOT_NAME, tableName, tmpDir, inputQuery);
+ configureJob(job, tableName, inputQuery, null, false);
}
- private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception {
+ private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception {
try {
- upsertAndSnapshot(tableName);
+ upsertAndSnapshot(tableName, shouldSplit);
result = new ArrayList<>();
job.setMapperClass(TableSnapshotMapper.class);
@@ -151,6 +146,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
if (condition != null) {
selectQuery.append(" WHERE " + condition);
}
+
if (inputQuery == null)
inputQuery = selectQuery.toString();
@@ -176,12 +172,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
private void upsertData(String tableName) throws SQLException {
Connection conn = DriverManager.getConnection(getUrl());
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
- upsertData(stmt, "CCCC", "SSDD", 0001);
- upsertData(stmt, "CCCC", "HDHG", 0005);
- upsertData(stmt, "BBBB", "JSHJ", 0002);
- upsertData(stmt, "AAAA", "JHHD", 0003);
+ upsertData(stmt, "AAAA", "JHHD", 37);
+ upsertData(stmt, "BBBB", "JSHJ", 224);
+ upsertData(stmt, "CCCC", "SSDD", 15);
+ upsertData(stmt, "PPPP", "AJDG", 53);
+ upsertData(stmt, "SSSS", "HSDG", 59);
+ upsertData(stmt, "XXXX", "HDPP", 22);
conn.commit();
- timestamp = System.currentTimeMillis();
}
private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
@@ -191,31 +188,52 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
stmt.execute();
}
- public void upsertAndSnapshot(String tableName) throws Exception {
+ private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception {
upsertData(tableName);
+ TableName hbaseTableName = TableName.valueOf(tableName);
Connection conn = DriverManager.getConnection(getUrl());
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
- // call flush to create new files in the region
- admin.flush(tableName);
+
+ if (shouldSplit) {
+ splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
+ }
+
+ admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
Assert.assertEquals(tableName, snapshots.get(0).getTable());
+ // Capture the snapshot timestamp to use as SCN while reading the table later
+ // Assigning the timestamp value here will make tests less flaky
+ timestamp = System.currentTimeMillis();
+
// upsert data after snapshot
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
- upsertData(stmt, "DDDD", "SNFB", 0004);
+ upsertData(stmt, "DDDD", "SNFB", 45);
conn.commit();
}
- public void deleteSnapshot(String tableName) throws Exception {
+ private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName,
+ byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException {
+ admin.split(hbaseTableName, splitPoint);
+ for (int i = 0; i < 100; i++) {
+ List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName);
+ if (hRegionInfoList.size() >= expectedRegions) {
+ break;
+ }
+ logger.info("Sleeping for 1000 ms while waiting for " + hbaseTableName.getNameAsString() + " to split");
+ Thread.sleep(1000);
+ }
+ }
+
+ private void deleteSnapshot(String tableName) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl());
HBaseAdmin admin =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
admin.deleteSnapshot(SNAPSHOT_NAME);
}
- }
+ }
public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 593608f..b4f81ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -45,7 +46,7 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper();
- public static MapReduceParallelScanGrouper getInstance() {
+ public static MapReduceParallelScanGrouper getInstance() {
return INSTANCE;
}
@@ -79,18 +80,39 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
}
}
+ /**
+ * Get list of region locations from SnapshotManifest
+ * BaseResultIterators assume that regions are sorted using RegionInfo.COMPARATOR
+ */
private List<HRegionLocation> getRegionLocationsFromManifest(SnapshotManifest manifest) {
List<SnapshotProtos.SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
Preconditions.checkNotNull(regionManifests);
- List<HRegionLocation> regionLocations = Lists.newArrayListWithCapacity(regionManifests.size());
+ List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
+ List<HRegionLocation> hRegionLocations = Lists.newArrayListWithCapacity(regionManifests.size());
for (SnapshotProtos.SnapshotRegionManifest regionManifest : regionManifests) {
- regionLocations.add(new HRegionLocation(
- HRegionInfo.convert(regionManifest.getRegionInfo()), null));
+ HRegionInfo regionInfo = HRegionInfo.convert(regionManifest.getRegionInfo());
+ if (isValidRegion(regionInfo)) {
+ regionInfos.add(regionInfo);
+ }
+ }
+
+ Collections.sort(regionInfos);
+
+ for (HRegionInfo regionInfo : regionInfos) {
+ hRegionLocations.add(new HRegionLocation(regionInfo, null));
}
- return regionLocations;
+ return hRegionLocations;
+ }
+
+ // Exclude offline split parent regions
+ private boolean isValidRegion(HRegionInfo hri) {
+ if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
+ return false;
+ }
+ return true;
}
private String getSnapshotName(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index 016d3be..c3d75f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -79,22 +78,31 @@ public class TableSnapshotResultIterator implements ResultIterator {
RestoreSnapshotHelper.RestoreMetaChanges meta =
RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs,
this.rootDir, this.restoreDir, this.snapshotName);
- List restoredRegions = meta.getRegionsToAdd();
+ List<HRegionInfo> restoredRegions = meta.getRegionsToAdd();
this.htd = meta.getTableDescriptor();
- this.regions = new ArrayList(restoredRegions.size());
- Iterator i$ = restoredRegions.iterator();
-
- while(i$.hasNext()) {
- HRegionInfo hri = (HRegionInfo)i$.next();
- if(CellUtil.overlappingKeys(this.scan.getStartRow(), this.scan.getStopRow(),
- hri.getStartKey(), hri.getEndKey())) {
- this.regions.add(hri);
+ this.regions = new ArrayList<>(restoredRegions.size());
+
+ for (HRegionInfo restoredRegion : restoredRegions) {
+ if (isValidRegion(restoredRegion)) {
+ this.regions.add(restoredRegion);
}
}
Collections.sort(this.regions);
}
+ /**
+ * Exclude offline split parent regions and
+ * regions that don't intersect with provided scan
+ */
+ private boolean isValidRegion(HRegionInfo hri) {
+ if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
+ return false;
+ }
+ return CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey());
+ }
+
public boolean initSnapshotScanner() throws SQLException {
if (closed) {
return true;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index e80f2c4..94e8036 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -1782,15 +1782,15 @@ public abstract class BaseTest {
/**
- * Split SYSTEM.CATALOG at the given split point
+ * Synchronously split table at the given split point
*/
- protected static void splitRegion(byte[] splitPoint) throws SQLException, IOException, InterruptedException {
- HBaseAdmin admin =
+ protected static void splitRegion(TableName fullTableName, byte[] splitPoint) throws SQLException, IOException, InterruptedException {
+ HBaseAdmin admin =
driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
- admin.split(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, splitPoint);
+ admin.split(fullTableName, splitPoint);
// make sure the split finishes (there's no synchronous splitting before HBase 2.x)
- admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME);
- admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME);
+ admin.disableTable(fullTableName);
+ admin.enableTable(fullTableName);
}
/**
@@ -1819,7 +1819,7 @@ public abstract class BaseTest {
AssignmentManager am = master.getAssignmentManager();
// No need to split on the first splitPoint since the end key of region boundaries are exclusive
for (int i=1; i<splitPoints.size(); ++i) {
- splitRegion(splitPoints.get(i));
+ splitRegion(fullTableName, splitPoints.get(i));
}
HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE);
Deque<ServerName> availableRegionServers = new ArrayDeque<ServerName>(NUM_SLAVES_BASE);