You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/19 21:02:17 UTC

[3/5] phoenix git commit: PHOENIX-4600 Add retry logic for partial index rebuilder writes

PHOENIX-4600 Add retry logic for partial index rebuilder writes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/07436780
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/07436780
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/07436780

Branch: refs/heads/4.x-cdh5.12
Commit: 074367804e5d3409988b237554161b722e5b8b35
Parents: 803abe7
Author: Vincent Poon <vi...@apache.org>
Authored: Thu Apr 19 18:30:14 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Thu Apr 19 21:58:11 2018 +0100

----------------------------------------------------------------------
 .../end2end/index/MutableIndexRebuilderIT.java  | 143 +++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java       |  32 +++--
 2 files changed, 160 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/07436780/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
new file mode 100644
index 0000000..8420f16
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java
@@ -0,0 +1,143 @@
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.Maps;
+
+@RunWith(RunUntilFailure.class)
+public class MutableIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+    private static final int WAIT_AFTER_DISABLED = 0;
+    private static final long REBUILD_PERIOD = 50000;
+    private static final long REBUILD_INTERVAL = 2000;
+    private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
+
+    /**
+     * Tests that the index rebuilder retries for exactly the configured # of retries
+     * @throws Exception
+     */
+    @Test
+    public void testRebuildRetriesSuccessful() throws Throwable {
+        int numberOfRetries = 5;
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL));
+        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numberOfRetries + "");
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String schemaName = generateUniqueName();
+            String tableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+            final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) DISABLE_INDEX_ON_WRITE_FAILURE = TRUE");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
+            conn.commit();
+            // Simulate write failure when rebuilder runs
+            TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+            waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE);
+            // rebuild writes should retry for exactly the configured number of times
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            try {
+                Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        runIndexRebuilder(fullTableName);
+                        return true;
+                    }});
+                assertTrue(future.get(120, TimeUnit.SECONDS));
+                assertEquals(numberOfRetries, WriteFailingRegionObserver.attempts.get());
+            } finally {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    public static void waitForIndexState(Connection conn, String fullTableName, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
+        int nRetries = 2;
+        PIndexState actualIndexState = null;
+        do {
+            runIndexRebuilder(fullTableName);
+            if ((actualIndexState = TestUtil.getIndexState(conn, fullIndexName)) == expectedIndexState) {
+                return;
+            }
+            Thread.sleep(1000);
+        } while (--nRetries > 0);
+        fail("Expected index state of " + expectedIndexState + ", but was " + actualIndexState);
+    }
+
+    private static void runIndexRebuilder(String table) throws InterruptedException, SQLException {
+        runIndexRebuilder(Collections.<String>singletonList(table));
+    }
+
+    private static void runIndexRebuilder(List<String> tables) throws InterruptedException, SQLException {
+        BuildIndexScheduleTask task =
+                new MetaDataRegionObserver.BuildIndexScheduleTask(
+                        indexRebuildTaskRegionEnvironment, tables);
+        task.run();
+    }
+
+    public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+        public static volatile AtomicInteger attempts = new AtomicInteger(0);
+        @Override
+        public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+            attempts.incrementAndGet();
+            throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/07436780/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 31b512a..5a26087 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -243,6 +243,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
     }
 
+    private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+        try {
+            commitBatch(region, localRegionMutations, blockingMemstoreSize);
+        } catch (IOException e) {
+            handleIndexWriteException(localRegionMutations, e, new MutateCommand() {
+                @Override
+                public void doMutation() throws IOException {
+                    commitBatch(region, localRegionMutations, blockingMemstoreSize);
+                }
+            });
+        }
+    }
+
     private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
       if (mutations.isEmpty()) {
           return;
@@ -251,7 +264,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
       // flush happen which decrease the memstore size and then writes allowed on the region.
-      for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+      for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
           try {
               checkForRegionClosing();
               Thread.sleep(100);
@@ -892,16 +905,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
         separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
             isPKChanging);
-        try {
-            commitBatch(region, localRegionMutations, blockingMemStoreSize);
-        } catch (IOException e) {
-            handleIndexWriteException(localRegionMutations, e, new MutateCommand() {
-                @Override
-                public void doMutation() throws IOException {
-                    commitBatch(region, localRegionMutations, blockingMemStoreSize);
-                }
-            });
-        }
+        commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize);
         try {
             commitBatchWithHTable(targetHTable, remoteRegionMutations);
         } catch (IOException e) {
@@ -1069,8 +1073,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
-                                    HConstants.NO_NONCE);
+                            commitBatchWithRetries(region, mutations, -1);
                             uuidValue = ServerCacheClient.generateId();
                             mutations.clear();
                         }
@@ -1079,8 +1082,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
-                            HConstants.NO_NONCE);
+                    commitBatchWithRetries(region, mutations, -1);
                 }
             }
         } catch (IOException e) {