You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2018/04/19 19:03:49 UTC
phoenix git commit: PHOENIX-4600 Add retry logic for partial index
rebuilder writes
Repository: phoenix
Updated Branches:
refs/heads/master fc6cf43a4 -> b768900d9
PHOENIX-4600 Add retry logic for partial index rebuilder writes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b768900d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b768900d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b768900d
Branch: refs/heads/master
Commit: b768900d9f9699fad612acc643866e3e420e7ce0
Parents: fc6cf43
Author: Vincent Poon <vi...@apache.org>
Authored: Thu Apr 19 10:30:14 2018 -0700
Committer: Vincent Poon <vi...@apache.org>
Committed: Thu Apr 19 12:03:34 2018 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexRebuilderIT.java | 143 +++++++++++++++++++
.../UngroupedAggregateRegionObserver.java | 32 +++--
2 files changed, 160 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b768900d/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
new file mode 100644
index 0000000..8420f16
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
@@ -0,0 +1,143 @@
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.Maps;
+
+@RunWith(RunUntilFailure.class)
+public class MutableIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+ private static final int WAIT_AFTER_DISABLED = 0;
+ private static final long REBUILD_PERIOD = 50000;
+ private static final long REBUILD_INTERVAL = 2000;
+ private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
+
+ /**
+ * Tests that the index rebuilder retries for exactly the configured # of retries
+ * @throws Exception
+ */
+ @Test
+ public void testRebuildRetriesSuccessful() throws Throwable {
+ int numberOfRetries = 5;
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL));
+ serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numberOfRetries + "");
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ indexRebuildTaskRegionEnvironment =
+ (RegionCoprocessorEnvironment) getUtility()
+ .getRSForFirstRegionInTable(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+ .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+ .get(0).getCoprocessorHost()
+ .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+ MetaDataRegionObserver.initRebuildIndexConnectionProps(
+ indexRebuildTaskRegionEnvironment.getConfiguration());
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) DISABLE_INDEX_ON_WRITE_FAILURE = TRUE");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+ HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
+ conn.commit();
+ // Simulate write failure when rebuilder runs
+ TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+ waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE);
+ // rebuild writes should retry for exactly the configured number of times
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ runIndexRebuilder(fullTableName);
+ return true;
+ }});
+ assertTrue(future.get(120, TimeUnit.SECONDS));
+ assertEquals(numberOfRetries, WriteFailingRegionObserver.attempts.get());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ public static void waitForIndexState(Connection conn, String fullTableName, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
+ int nRetries = 2;
+ PIndexState actualIndexState = null;
+ do {
+ runIndexRebuilder(fullTableName);
+ if ((actualIndexState = TestUtil.getIndexState(conn, fullIndexName)) == expectedIndexState) {
+ return;
+ }
+ Thread.sleep(1000);
+ } while (--nRetries > 0);
+ fail("Expected index state of " + expectedIndexState + ", but was " + actualIndexState);
+ }
+
+ private static void runIndexRebuilder(String table) throws InterruptedException, SQLException {
+ runIndexRebuilder(Collections.<String>singletonList(table));
+ }
+
+ private static void runIndexRebuilder(List<String> tables) throws InterruptedException, SQLException {
+ BuildIndexScheduleTask task =
+ new MetaDataRegionObserver.BuildIndexScheduleTask(
+ indexRebuildTaskRegionEnvironment, tables);
+ task.run();
+ }
+
+ public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+ public static volatile AtomicInteger attempts = new AtomicInteger(0);
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ attempts.incrementAndGet();
+ throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b768900d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d202193..abdcf72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -243,6 +243,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
}
+ private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+ try {
+ commitBatch(region, localRegionMutations, blockingMemstoreSize);
+ } catch (IOException e) {
+ handleIndexWriteException(localRegionMutations, e, new MutateCommand() {
+ @Override
+ public void doMutation() throws IOException {
+ commitBatch(region, localRegionMutations, blockingMemstoreSize);
+ }
+ });
+ }
+ }
+
private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
if (mutations.isEmpty()) {
return;
@@ -251,7 +264,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
// flush happen which decrease the memstore size and then writes allowed on the region.
- for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+ for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
try {
checkForRegionClosing();
Thread.sleep(100);
@@ -892,16 +905,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
isPKChanging);
- try {
- commitBatch(region, localRegionMutations, blockingMemStoreSize);
- } catch (IOException e) {
- handleIndexWriteException(localRegionMutations, e, new MutateCommand() {
- @Override
- public void doMutation() throws IOException {
- commitBatch(region, localRegionMutations, blockingMemStoreSize);
- }
- });
- }
+ commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize);
try {
commitBatchWithHTable(targetHTable, remoteRegionMutations);
} catch (IOException e) {
@@ -1069,8 +1073,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ commitBatchWithRetries(region, mutations, -1);
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
@@ -1079,8 +1082,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
- region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ commitBatchWithRetries(region, mutations, -1);
}
}
} catch (IOException e) {