You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/07 01:25:05 UTC
[1/2] phoenix git commit: PHOENIX-4175 Convert tests using
CURRENT_SCN to not use it when possible
Repository: phoenix
Updated Branches:
refs/heads/master 28aebd6af -> b46cbd375
PHOENIX-4175 Convert tests using CURRENT_SCN to not use it when possible
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b46cbd37
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b46cbd37
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b46cbd37
Branch: refs/heads/master
Commit: b46cbd375e3d2ee9a11644825c13937572c027cd
Parents: 6c5bc3b
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 18:05:42 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Sep 6 18:24:51 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/CreateSchemaIT.java | 26 +++----
.../phoenix/end2end/CustomEntityDataIT.java | 75 ++++++++++++--------
.../apache/phoenix/end2end/UpsertSelectIT.java | 42 +++++++++--
3 files changed, 90 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46cbd37/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
index 09cd810..fe09dcd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
@@ -30,41 +30,31 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.SchemaAlreadyExistsException;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
-public class CreateSchemaIT extends BaseClientManagedTimeIT {
+public class CreateSchemaIT extends ParallelStatsDisabledIT {
@Test
public void testCreateSchema() throws Exception {
- long ts = nextTimestamp();
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
- String ddl = "CREATE SCHEMA TEST_SCHEMA";
+ String schemaName = generateUniqueName();
+ String ddl = "CREATE SCHEMA " + schemaName;
try (Connection conn = DriverManager.getConnection(getUrl(), props);
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
conn.createStatement().execute(ddl);
- assertNotNull(admin.getNamespaceDescriptor("TEST_SCHEMA"));
+ assertNotNull(admin.getNamespaceDescriptor(schemaName));
}
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
- conn.createStatement().execute(ddl);
- fail();
- } catch (SchemaAlreadyExistsException e) {
- // expected
- }
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 20));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute(ddl);
fail();
- } catch (NewerSchemaAlreadyExistsException e) {
+ } catch (SchemaAlreadyExistsException e) {
// expected
}
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 50));
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
conn.createStatement().execute("CREATE SCHEMA " + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46cbd37/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
index ad0f308..4af2c5c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.ROW2;
import static org.apache.phoenix.util.TestUtil.ROW5;
import static org.apache.phoenix.util.TestUtil.ROW9;
@@ -32,26 +31,49 @@ import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.util.Properties;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
-public class CustomEntityDataIT extends BaseClientManagedTimeIT {
+public class CustomEntityDataIT extends ParallelStatsDisabledIT {
- protected static void initTableValues(String tenantId, byte[][] splits, long ts) throws Exception {
- ensureTableCreated(getUrl(),CUSTOM_ENTITY_DATA_FULL_NAME,CUSTOM_ENTITY_DATA_FULL_NAME, ts-2);
-
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(url, props);
+ private static void initTableValues(Connection conn, String tenantId, String tableName) throws Exception {
+ String ddl = "create table " + tableName +
+ " (organization_id char(15) not null, \n" +
+ " key_prefix char(3) not null,\n" +
+ " custom_entity_data_id char(12) not null,\n" +
+ " created_by varchar,\n" +
+ " created_date date,\n" +
+ " currency_iso_code char(3),\n" +
+ " deleted char(1),\n" +
+ " division decimal(31,10),\n" +
+ " last_activity date,\n" +
+ " last_update date,\n" +
+ " last_update_by varchar,\n" +
+ " name varchar(240),\n" +
+ " owner varchar,\n" +
+ " record_type_id char(15),\n" +
+ " setup_owner varchar,\n" +
+ " system_modstamp date,\n" +
+ " b.val0 varchar,\n" +
+ " b.val1 varchar,\n" +
+ " b.val2 varchar,\n" +
+ " b.val3 varchar,\n" +
+ " b.val4 varchar,\n" +
+ " b.val5 varchar,\n" +
+ " b.val6 varchar,\n" +
+ " b.val7 varchar,\n" +
+ " b.val8 varchar,\n" +
+ " b.val9 varchar\n" +
+ " CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) SPLIT ON ('" + tenantId + "00A','" + tenantId + "00B','" + tenantId + "00C')";
+
+ conn.createStatement().execute(ddl);
// Insert all rows at ts
PreparedStatement stmt = conn.prepareStatement(
- "upsert into " +
- "CORE.CUSTOM_ENTITY_DATA(" +
+ "upsert into " + tableName +
+ "(" +
" ORGANIZATION_ID, " +
" KEY_PREFIX, " +
" CUSTOM_ENTITY_DATA_ID, " +
@@ -154,18 +176,16 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
stmt.execute();
conn.commit();
- conn.close();
}
@Test
public void testUngroupedAggregation() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- String query = "SELECT count(1) FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
- Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ String tableName = generateUniqueName();
+ String query = "SELECT count(1) FROM " + tableName + " WHERE organization_id=?";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
ResultSet rs = statement.executeQuery();
@@ -179,13 +199,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
@Test
public void testScan() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
- Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ String tableName = generateUniqueName();
+ String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM " + tableName + " WHERE organization_id=?";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
ResultSet rs = statement.executeQuery();
@@ -203,14 +222,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
@Test
public void testWhereStringConcatExpression() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
- String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM CORE.CUSTOM_ENTITY_DATA where '00A'||val0 LIKE '00A2%'";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM " + tableName + " where '00A'||val0 LIKE '00A2%'";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
ResultSet rs=statement.executeQuery();
assertTrue (rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b46cbd37/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index eb8df18..7fb2751 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.util.TestUtil.A_VALUE;
import static org.apache.phoenix.util.TestUtil.B_VALUE;
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.C_VALUE;
import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
import static org.apache.phoenix.util.TestUtil.ROW6;
@@ -100,23 +99,54 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
long ts = nextTimestamp();
String tenantId = getOrganizationId();
byte[][] splits = getDefaultSplits(tenantId);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String aTable = initATableValues(tenantId, saltTable ? null : splits, null, ts-1, getUrl(), saltTable ? "salt_buckets = 2" : null);
String customEntityTable = generateUniqueName();
- ensureTableCreated(getUrl(), customEntityTable, CUSTOM_ENTITY_DATA_FULL_NAME, null, ts-1, saltTable ? "salt_buckets = 2" : null);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 1));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "create table " + customEntityTable +
+ " (organization_id char(15) not null, \n" +
+ " key_prefix char(3) not null,\n" +
+ " custom_entity_data_id char(12) not null,\n" +
+ " created_by varchar,\n" +
+ " created_date date,\n" +
+ " currency_iso_code char(3),\n" +
+ " deleted char(1),\n" +
+ " division decimal(31,10),\n" +
+ " last_activity date,\n" +
+ " last_update date,\n" +
+ " last_update_by varchar,\n" +
+ " name varchar(240),\n" +
+ " owner varchar,\n" +
+ " record_type_id char(15),\n" +
+ " setup_owner varchar,\n" +
+ " system_modstamp date,\n" +
+ " b.val0 varchar,\n" +
+ " b.val1 varchar,\n" +
+ " b.val2 varchar,\n" +
+ " b.val3 varchar,\n" +
+ " b.val4 varchar,\n" +
+ " b.val5 varchar,\n" +
+ " b.val6 varchar,\n" +
+ " b.val7 varchar,\n" +
+ " b.val8 varchar,\n" +
+ " b.val9 varchar\n" +
+ " CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) " + (saltTable ? "salt_buckets = 2" : "");
+ conn.createStatement().execute(ddl);
+ conn.close();
+
String indexName = generateUniqueName();
if (createIndex) {
- Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); // Execute at timestamp 1
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + aTable + "(a_string)" );
conn.close();
}
PreparedStatement upsertStmt;
- Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3)); // Trigger multiple batches
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(true);
String upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
"SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, a_string FROM " + aTable + " WHERE ?=a_string";
[2/2] phoenix git commit: PHOENIX-4173 Ensure that the rebuild fails
if an index that transitions back to disabled while rebuilding
Posted by ja...@apache.org.
PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to disabled while rebuilding
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6c5bc3bb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6c5bc3bb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6c5bc3bb
Branch: refs/heads/master
Commit: 6c5bc3bba7732357bf3fc4ab39e7fda10e97539e
Parents: 28aebd6
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 12:46:34 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Sep 6 18:24:51 2017 -0700
----------------------------------------------------------------------
.../end2end/index/PartialIndexRebuilderIT.java | 151 ++++++++++++++++++-
1 file changed, 143 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c5bc3bb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index cacf0fa..067f50f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@@ -30,7 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -38,10 +39,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PTable;
@@ -634,6 +638,94 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1);
+ private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1);
+
+
+ @Test
+ public void testDisableIndexDuringRebuild() throws Throwable {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ PTableKey key = new PTableKey(null,fullTableName);
+ final MyClock clock = new MyClock(1000);
+ EnvironmentEdgeManager.injectEdge(clock);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+ clock.time += 100;
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)");
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+ conn.commit();
+ clock.time += 100;
+ try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+ // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+ IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+ clock.time += 100;
+ long disableTime = clock.currentTime();
+ // Set some values while index disabled
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')");
+ conn.commit();
+ clock.time += 100;
+ assertTrue(hasDisabledIndex(metaCache, key));
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
+ conn.commit();
+ clock.time += 100;
+ // Will cause partial index rebuilder to be triggered
+ IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+ final CountDownLatch doneSignal = new CountDownLatch(1);
+ advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
+ // Set some values while index is in INACTIVE state
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+ conn.commit();
+ doneSignal.await(30, TimeUnit.SECONDS);
+ // Install coprocessor that will simulate an index write failure during index rebuild
+ addWriteFailingCoprocessor(conn,fullIndexName);
+ clock.time += WAIT_AFTER_DISABLED;
+ doneSignal.await(30, TimeUnit.SECONDS);
+ WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
+ // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+ IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+ clock.time += 100;
+ disableTime = clock.currentTime();
+ // Set some values while index disabled
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
+ conn.commit();
+ clock.time += 100;
+ // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later
+ IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+ removeWriteFailingCoprocessor(conn,fullIndexName);
+ WAIT_FOR_INDEX_WRITE.countDown();
+ }
+ // Original rebuilder should have failed
+
+ advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+ clock.time += WAIT_AFTER_DISABLED * 2;
+ // Enough time has passed, so rebuild will start now
+ TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ clock.time += 100;
+ IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+ } finally {
+ EnvironmentEdgeManager.injectEdge(null);
+ }
+ }
+
@Test
public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
String schemaName = generateUniqueName();
@@ -751,15 +843,58 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
t.start();
}
- public static class DelayingRegionObserver extends SimpleRegionObserver {
- @Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- try {
- Thread.sleep(Math.abs(RAND.nextInt()) % 10);
- } catch (InterruptedException e) {
+ private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+ int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+ descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
+ int numTries = 10;
+ try (HBaseAdmin admin = services.getAdmin()) {
+ admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+ while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+ && numTries > 0) {
+ numTries--;
+ if (numTries == 0) {
+ throw new Exception(
+ "Check to detect if delaying co-processor was added failed after "
+ + numTries + " retries.");
+ }
+ Thread.sleep(1000);
}
-
}
}
+ private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+ descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
+ int numTries = 10;
+ try (HBaseAdmin admin = services.getAdmin()) {
+ admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+ while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+ && numTries > 0) {
+ numTries--;
+ if (numTries == 0) {
+ throw new Exception(
+ "Check to detect if delaying co-processor was removed failed after "
+ + numTries + " retries.");
+ }
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ WAIT_FOR_REBUILD_TO_START.countDown();
+ try {
+ WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+ }
+
}