You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2018/11/02 00:53:31 UTC

phoenix git commit: PHOENIX-4997 Phoenix MR on snapshots can produce duplicate rows

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.4 bbd31a9e0 -> 8ccf69f00


PHOENIX-4997 Phoenix MR on snapshots can produce duplicate rows


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ccf69f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ccf69f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ccf69f0

Branch: refs/heads/4.x-HBase-1.4
Commit: 8ccf69f00d318873ddf0e77e73bf8e5045fdb3c4
Parents: bbd31a9
Author: Karan Mehta <ka...@gmail.com>
Authored: Thu Nov 1 17:15:26 2018 -0700
Committer: Karan Mehta <ka...@gmail.com>
Committed: Thu Nov 1 17:49:55 2018 -0700

----------------------------------------------------------------------
 .../end2end/TableSnapshotReadsMapReduceIT.java  | 122 +++++++++++--------
 .../iterate/MapReduceParallelScanGrouper.java   |  32 ++++-
 .../iterate/TableSnapshotResultIterator.java    |  28 +++--
 .../java/org/apache/phoenix/query/BaseTest.java |  14 +--
 4 files changed, 122 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index cae91a3..e35e159 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -36,6 +36,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -49,12 +50,18 @@ import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
+
+  private static final Logger logger = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
+
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
@@ -66,6 +73,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
   private static List<List<Object>> result;
   private long timestamp;
   private String tableName;
+  private Job job;
+  private Path tmpDir;
+  private Configuration conf;
 
   @BeforeClass
   public static void doSetup() throws Exception {
@@ -73,8 +83,8 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
       setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
-  @Test
-  public void testMapReduceSnapshots() throws Exception {
+  @Before
+  public void before() throws SQLException, IOException {
     // create table
     Connection conn = DriverManager.getConnection(getUrl());
     tableName = generateUniqueName();
@@ -82,58 +92,43 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
     conn.commit();
 
     // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
+    conf = getUtility().getConfiguration();
+    job = Job.getInstance(conf);
+    tmpDir = getUtility().getRandomDir();
+  }
 
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3);
+  @Test
+  public void testMapReduceSnapshots() throws Exception {
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+            SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+  }
 
-    // configure and test job
-    configureJob(job, tableName, null, null);
+  @Test
+  public void testMapReduceSnapshotsMultiRegion() throws Exception {
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+            SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, true);
   }
 
   @Test
   public void testMapReduceSnapshotsWithCondition() throws Exception {
-    // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
-
-    // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
-
-    // configure and test job
-    configureJob(job, tableName, null, "FIELD3 > 0001");
-
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+            SNAPSHOT_NAME, tableName, tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, "FIELD3 > 0001", false);
   }
 
   @Test
   public void testMapReduceSnapshotWithLimit() throws Exception {
-    // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
-
-    // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
-    // Running limit with order by on non pk column
     String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery);
-
-    // configure and test job
-    configureJob(job, tableName, inputQuery, null);
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
+            SNAPSHOT_NAME, tableName, tmpDir, inputQuery);
+    configureJob(job, tableName, inputQuery, null, false);
   }
 
-  private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception {
+  private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception {
     try {
-      upsertAndSnapshot(tableName);
+      upsertAndSnapshot(tableName, shouldSplit);
       result = new ArrayList<>();
 
       job.setMapperClass(TableSnapshotMapper.class);
@@ -151,6 +146,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
       if (condition != null) {
         selectQuery.append(" WHERE " + condition);
       }
+
       if (inputQuery == null)
         inputQuery = selectQuery.toString();
 
@@ -176,12 +172,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
   private void upsertData(String tableName) throws SQLException {
     Connection conn = DriverManager.getConnection(getUrl());
     PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-    upsertData(stmt, "CCCC", "SSDD", 0001);
-    upsertData(stmt, "CCCC", "HDHG", 0005);
-    upsertData(stmt, "BBBB", "JSHJ", 0002);
-    upsertData(stmt, "AAAA", "JHHD", 0003);
+    upsertData(stmt, "AAAA", "JHHD", 37);
+    upsertData(stmt, "BBBB", "JSHJ", 224);
+    upsertData(stmt, "CCCC", "SSDD", 15);
+    upsertData(stmt, "PPPP", "AJDG", 53);
+    upsertData(stmt, "SSSS", "HSDG", 59);
+    upsertData(stmt, "XXXX", "HDPP", 22);
     conn.commit();
-    timestamp = System.currentTimeMillis();
   }
 
   private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
@@ -191,31 +188,52 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
     stmt.execute();
   }
 
-  public void upsertAndSnapshot(String tableName) throws Exception {
+  private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception {
     upsertData(tableName);
 
+    TableName hbaseTableName = TableName.valueOf(tableName);
     Connection conn = DriverManager.getConnection(getUrl());
     HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-    admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
-    // call flush to create new files in the region
-    admin.flush(tableName);
+
+    if (shouldSplit) {
+      splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
+    }
+
+    admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
 
     List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
     Assert.assertEquals(tableName, snapshots.get(0).getTable());
 
+    // Capture the snapshot timestamp to use as SCN while reading the table later
+    // Assigning the timestamp value here will make tests less flaky
+    timestamp = System.currentTimeMillis();
+
     // upsert data after snapshot
     PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-    upsertData(stmt, "DDDD", "SNFB", 0004);
+    upsertData(stmt, "DDDD", "SNFB", 45);
     conn.commit();
   }
 
-    public void deleteSnapshot(String tableName) throws Exception {
+  private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName,
+                              byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException {
+    admin.split(hbaseTableName, splitPoint);
+    for (int i = 0; i < 100; i++) {
+      List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName);
+      if (hRegionInfoList.size() >= expectedRegions) {
+        break;
+      }
+      logger.info("Sleeping for 1000 ms while waiting for " + hbaseTableName.getNameAsString() + " to split");
+      Thread.sleep(1000);
+    }
+  }
+
+  private void deleteSnapshot(String tableName) throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl());
                 HBaseAdmin admin =
                         conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
             admin.deleteSnapshot(SNAPSHOT_NAME);
         }
-    }
+  }
 
   public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 593608f..b4f81ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -45,7 +46,7 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
 
 	private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper();
 
-  public static MapReduceParallelScanGrouper getInstance() {
+    public static MapReduceParallelScanGrouper getInstance() {
 		return INSTANCE;
 	}
 
@@ -79,18 +80,39 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
 		}
 	}
 
+	/**
+	 * Get list of region locations from SnapshotManifest
+	 * BaseResultIterators assume that regions are sorted using RegionInfo.COMPARATOR
+	 */
 	private List<HRegionLocation> getRegionLocationsFromManifest(SnapshotManifest manifest) {
 		List<SnapshotProtos.SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
 		Preconditions.checkNotNull(regionManifests);
 
-		List<HRegionLocation> regionLocations = Lists.newArrayListWithCapacity(regionManifests.size());
+		List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
+		List<HRegionLocation> hRegionLocations = Lists.newArrayListWithCapacity(regionManifests.size());
 
 		for (SnapshotProtos.SnapshotRegionManifest regionManifest : regionManifests) {
-			regionLocations.add(new HRegionLocation(
-					HRegionInfo.convert(regionManifest.getRegionInfo()), null));
+			HRegionInfo regionInfo = HRegionInfo.convert(regionManifest.getRegionInfo());
+			if (isValidRegion(regionInfo)) {
+				regionInfos.add(regionInfo);
+			}
+		}
+
+		Collections.sort(regionInfos);
+
+		for (HRegionInfo regionInfo : regionInfos) {
+			hRegionLocations.add(new HRegionLocation(regionInfo, null));
 		}
 
-		return regionLocations;
+		return hRegionLocations;
+	}
+
+	// Exclude offline split parent regions
+	private boolean isValidRegion(HRegionInfo hri) {
+		if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
+			return false;
+		}
+		return true;
 	}
 
 	private String getSnapshotName(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index 016d3be..c3d75f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -79,22 +78,31 @@ public class TableSnapshotResultIterator implements ResultIterator {
     RestoreSnapshotHelper.RestoreMetaChanges meta =
         RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs,
             this.rootDir, this.restoreDir, this.snapshotName);
-    List restoredRegions = meta.getRegionsToAdd();
+    List<HRegionInfo> restoredRegions = meta.getRegionsToAdd();
     this.htd = meta.getTableDescriptor();
-    this.regions = new ArrayList(restoredRegions.size());
-    Iterator i$ = restoredRegions.iterator();
-
-    while(i$.hasNext()) {
-      HRegionInfo hri = (HRegionInfo)i$.next();
-      if(CellUtil.overlappingKeys(this.scan.getStartRow(), this.scan.getStopRow(),
-          hri.getStartKey(), hri.getEndKey())) {
-        this.regions.add(hri);
+    this.regions = new ArrayList<>(restoredRegions.size());
+
+    for (HRegionInfo restoredRegion : restoredRegions) {
+      if (isValidRegion(restoredRegion)) {
+        this.regions.add(restoredRegion);
       }
     }
 
     Collections.sort(this.regions);
   }
 
+  /**
+   * Exclude offline split parent regions and
+   * regions that don't intersect with provided scan
+   */
+  private boolean isValidRegion(HRegionInfo hri) {
+    if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
+      return false;
+    }
+    return CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+            hri.getStartKey(), hri.getEndKey());
+  }
+
   public boolean initSnapshotScanner() throws SQLException {
     if (closed) {
       return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ccf69f0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index e80f2c4..94e8036 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -1782,15 +1782,15 @@ public abstract class BaseTest {
     
 
     /**
-     *  Split SYSTEM.CATALOG at the given split point 
+     *  Synchronously split table at the given split point
      */
-    protected static void splitRegion(byte[] splitPoint) throws SQLException, IOException, InterruptedException {
-        HBaseAdmin admin =
+    protected static void splitRegion(TableName fullTableName, byte[] splitPoint) throws SQLException, IOException, InterruptedException {
+         HBaseAdmin admin =
                 driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
-        admin.split(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, splitPoint);
+        admin.split(fullTableName, splitPoint);
         // make sure the split finishes (there's no synchronous splitting before HBase 2.x)
-        admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME);
-        admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME);
+        admin.disableTable(fullTableName);
+        admin.enableTable(fullTableName);
     }
     
     /**
@@ -1819,7 +1819,7 @@ public abstract class BaseTest {
         AssignmentManager am = master.getAssignmentManager();
         // No need to split on the first splitPoint since the end key of region boundaries are exclusive
         for (int i=1; i<splitPoints.size(); ++i) {
-            splitRegion(splitPoints.get(i));
+            splitRegion(fullTableName, splitPoints.get(i));
         }
         HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE);
         Deque<ServerName> availableRegionServers = new ArrayDeque<ServerName>(NUM_SLAVES_BASE);