You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/09/28 04:16:11 UTC
phoenix git commit: PHOENIX-4930 Add test for ORDER BY and LIMIT
queries during a split
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.2 bd5aa2d9c -> f4ebaff0d
PHOENIX-4930 Add test for ORDER BY and LIMIT queries during a split
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f4ebaff0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f4ebaff0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f4ebaff0
Branch: refs/heads/4.x-HBase-1.2
Commit: f4ebaff0df48e8801c11d1411f3eb205262c5c9a
Parents: bd5aa2d
Author: Thomas D'Silva <td...@apache.org>
Authored: Sat Sep 15 14:27:14 2018 -0700
Committer: Thomas D'Silva <td...@apache.org>
Committed: Thu Sep 27 21:13:34 2018 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/SplitIT.java | 260 +++++++++++++++++++
1 file changed, 260 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4ebaff0/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java
new file mode 100644
index 0000000..73cf1f0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java
@@ -0,0 +1,260 @@
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class SplitIT extends BaseUniqueNamesOwnClusterIT {
+ private static final String SPLIT_TABLE_NAME_PREFIX = "SPLIT_TABLE_";
+ private static boolean tableWasSplitDuringScannerNext = false;
+ private static byte[] splitPoint = null;
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+ serverProps.put("hbase.coprocessor.region.classes", TestRegionObserver.class.getName());
+ serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(3);
+ clientProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(10));
+ // read rows in batches 3 at time
+ clientProps.put(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ public static class TestRegionObserver extends BaseRegionObserver {
+
+ @Override
+ public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final InternalScanner s, final List<Result> results, final int limit,
+ final boolean hasMore) throws IOException {
+ Region region = c.getEnvironment().getRegion();
+ String tableName = region.getRegionInfo().getTable().getNameAsString();
+ if (tableName.startsWith(SPLIT_TABLE_NAME_PREFIX) && results.size()>1) {
+ int pk = (Integer)PInteger.INSTANCE.toObject(results.get(0).getRow());
+ // split when row 10 is read
+ if (pk==10 && !tableWasSplitDuringScannerNext) {
+ try {
+ // split on the first row being scanned if splitPoint is null
+ splitPoint = splitPoint!=null ? splitPoint : results.get(0).getRow();
+ splitTable(splitPoint, tableName);
+ tableWasSplitDuringScannerNext = true;
+ }
+ catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ return hasMore;
+ }
+
+ }
+
+ public static void splitTable(byte[] splitPoint, String tableName) throws SQLException, IOException {
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ int nRegions = admin.getTableRegions(tableName.getBytes()).size();
+ int nInitialRegions = nRegions;
+ admin.split(tableName.getBytes(), splitPoint);
+ admin.disableTable(tableName);
+ admin.enableTable(tableName);
+ nRegions = admin.getTableRegions(tableName.getBytes()).size();
+ if (nRegions == nInitialRegions)
+ throw new IOException("Could not split for " + tableName);
+ }
+
+ /**
+ * Runs an UPSERT SELECT into the same table while the table is split
+ */
+ public void helpTestUpsertSelectWithSplit(boolean splitTableBeforeUpsertSelect) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(true);
+ String keys = generateUniqueName();
+ conn.createStatement().execute("CREATE SEQUENCE " + keys + " CACHE 1000");
+ String tableName = SPLIT_TABLE_NAME_PREFIX + generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)");
+
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR " + keys + ",1)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + keys + ", pk FROM " + tableName);
+ for (int i=0; i<7; i++) {
+ if (splitTableBeforeUpsertSelect) {
+ // split the table and then run the UPSERT SELECT
+ splitTable(PInteger.INSTANCE.toBytes(Math.pow(2, i)), tableName);
+ }
+ int upsertCount = stmt.executeUpdate();
+ assertEquals((int) Math.pow(2, i), upsertCount);
+ }
+ conn.close();
+ }
+
+ /**
+ * Runs SELECT to a table that is being written to while a SPLIT happens
+ */
+ public void helpTestSelectWithSplit(boolean splitTableBeforeSelect, boolean orderBy, boolean limit) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(true);
+ String tableName = SPLIT_TABLE_NAME_PREFIX + generateUniqueName();
+ int pk = 1;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)");
+
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (1,1)");
+ PreparedStatement stmt = conn.prepareStatement(" UPSERT INTO " + tableName + " VALUES (?,?) ");
+ for (int i=0; i<5; i++) {
+ if (splitTableBeforeSelect) {
+ // split the table and then run the SELECT
+ splitTable(PInteger.INSTANCE.toBytes(Math.pow(2, i)), tableName);
+ }
+
+ int count = 0;
+ while (count<2) {
+ String query = "SELECT * FROM " + tableName + (orderBy ? " ORDER BY val" : "") + (limit ? " LIMIT 32" : "");
+ try {
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ while (rs.next()) {
+ stmt.setInt(1, ++pk);
+ stmt.setInt(2, rs.getInt(1));
+ stmt.execute();
+ }
+ break;
+ } catch (StaleRegionBoundaryCacheException e) {
+ if (!orderBy)
+ fail("Simple selects should not check for splits, they let HBase restart the scan");
+ if (count>0)
+ throw e;
+ ++count;
+ }
+ }
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName);
+ assertTrue(rs.next());
+ int rowCount = rs.getInt(1);
+ assertFalse(rs.next());
+
+ // for ORDER BY a StaleRegionBoundaryException is thrown when a sp[it happens
+ if (orderBy) {
+ // if the table splits before the SELECT we always detect this so we never see rows written after the scan started
+ if (splitTableBeforeSelect)
+ assertEquals((int) Math.pow(2, i + 1), rowCount);
+ // else we see rows written after the SELECT started
+ else if (i == 4) {
+ assert ((int) Math.pow(2, i + 1) < rowCount);
+ }
+ }
+ // verify that we will see more rows written after the scan started after a split happens for simple select
+ else if ((splitTableBeforeSelect && i == 3) || i == 4) {
+ assert ((int) Math.pow(2, i + 1) < rowCount);
+ }
+ }
+ conn.close();
+ }
+
+ @Test
+ public void testUpsertSelectAfterTableSplit() throws Exception {
+ // no need to split the table during the UPSERT SELECT for this test so just set the flag to true
+ tableWasSplitDuringScannerNext = true;
+ helpTestUpsertSelectWithSplit(true);
+ }
+
+ @Test
+ public void testUpsertSelectDuringSplitOnRowScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ splitPoint = null;
+ helpTestUpsertSelectWithSplit(false);
+ }
+
+ @Test
+ public void testUpsertSelectDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ // when the table has 16 rows, split the table in the middle of the region on row 14
+ splitPoint = PInteger.INSTANCE.toBytes(14);
+ helpTestUpsertSelectWithSplit(false);
+ }
+
+ @Test
+ public void testSimpleSelectAfterTableSplit() throws Exception {
+ // no need to split the table while running the SELECT and the UPSERT so just set the flag to true
+ tableWasSplitDuringScannerNext = true;
+ helpTestSelectWithSplit(true, false, false);
+ }
+
+ @Test
+ public void testSimpleSelectDuringSplitOnRowScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ splitPoint = null;
+ helpTestSelectWithSplit(false, false, false);
+ }
+
+ @Test
+ public void testSimpleSelectDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ // when the table has 16 rows, split the table in the middle of the region on row 14
+ splitPoint = PInteger.INSTANCE.toBytes(14);
+ helpTestSelectWithSplit(false, false, false);
+ }
+
+ @Test
+ public void testOrderByAfterTableSplit() throws Exception {
+ // no need to split the table while running the SELECT and the UPSERT so just set the flag to true
+ tableWasSplitDuringScannerNext = true;
+ helpTestSelectWithSplit(true, true, false);
+ }
+
+ @Test
+ public void testOrderByDuringSplitOnRowScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ splitPoint = null;
+ helpTestSelectWithSplit(false, true, false);
+ }
+
+ @Test
+ public void testOrderByDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ // when the table has 16 rows, split the table in the middle of the region on row 14
+ splitPoint = PInteger.INSTANCE.toBytes(14);
+ helpTestSelectWithSplit(false, true, false);
+ }
+
+ @Test
+ public void testLimitAfterTableSplit() throws Exception {
+ // no need to split the table while running the SELECT and the UPSERT so just set the flag to true
+ tableWasSplitDuringScannerNext = true;
+ helpTestSelectWithSplit(true, false, true);
+ }
+
+ @Test
+ public void testLimitDuringSplitOnRowScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ splitPoint = null;
+ helpTestSelectWithSplit(false, false, true);
+ }
+
+ @Test
+ public void testLimitDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception {
+ tableWasSplitDuringScannerNext = false;
+ // when the table has 16 rows, split the table in the middle of the region on row 14
+ splitPoint = PInteger.INSTANCE.toBytes(14);
+ helpTestSelectWithSplit(false, false, true);
+ }
+}