You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 20:14:49 UTC
[4/4] phoenix git commit: PHOENIX-4214 Scans which write should not
block region split or close (Vincent Poon)
PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6f65a793
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6f65a793
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6f65a793
Branch: refs/heads/4.x-HBase-1.2
Commit: 6f65a7935b640969e570b870de9fa59e2a5bca67
Parents: 4354a2c
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Sep 26 09:31:44 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:11:12 2017 -0700
----------------------------------------------------------------------
.../UpsertSelectOverlappingBatchesIT.java | 239 +++++++++++++++----
.../UngroupedAggregateRegionObserver.java | 23 +-
2 files changed, 209 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..fbf3231 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -32,25 +33,43 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-
+ private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+ private Properties props;
+ private static volatile String dataTable;
+ private String index;
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -60,7 +79,12 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
-
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ getUtility().shutdownMiniCluster();
+ }
+
private class UpsertSelectRunner implements Callable<Boolean> {
private final String dataTable;
private final int minIndex;
@@ -89,58 +113,185 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
return true;
}
}
-
}
-
+
+ private static class UpsertSelectLooper implements Runnable {
+ private UpsertSelectRunner runner;
+ public UpsertSelectLooper(UpsertSelectRunner runner) {
+ this.runner = runner;
+ }
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ runner.call();
+ }
+ catch (Exception e) {
+ if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) {
+ logger.info("Interrupted, exiting", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ logger.error("Hit exception while writing", e);
+ }
+ }
+ }};
+
+ @Before
+ public void setup() throws Exception {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ dataTable = generateUniqueName();
+ index = "IDX_" + dataTable;
+ try (Connection conn = driver.connect(url, props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTable
+ + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ // create the index and ensure its empty as well
+ conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+ conn.setAutoCommit(false);
+ for (int i = 0; i < 100; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "v1" + i);
+ stmt.setString(3, "v2" + i);
+ stmt.execute();
+ }
+ conn.commit();
+ }
+ }
+
@Test
public void testUpsertSelectSameBatchConcurrently() throws Exception {
- final String dataTable = generateUniqueName();
- final String index = "IDX_" + dataTable;
- // create the table and ensure its empty
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = driver.connect(url, props);
- conn.createStatement()
- .execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
- // create the index and ensure its empty as well
- conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-
- conn = DriverManager.getConnection(getUrl(), props);
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
- conn.setAutoCommit(false);
- for (int i = 0; i < 100; i++) {
- stmt.setInt(1, i);
- stmt.setString(2, "v1" + i);
- stmt.setString(3, "v2" + i);
- stmt.execute();
- }
- conn.commit();
-
- int numUpsertSelectRunners = 5;
- ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
- CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
- List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
- // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
- futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
- // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
- for (int i = 0; i < 100; i += 25) {
- futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
- }
- int received = 0;
- while (received < futures.size()) {
- Future<Boolean> resultFuture = completionService.take();
- Boolean result = resultFuture.get();
- received++;
- assertTrue(result);
+ try (Connection conn = driver.connect(url, props)) {
+ int numUpsertSelectRunners = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+ List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+ // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+ // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+ for (int i = 0; i < 100; i += 25) {
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+ }
+ int received = 0;
+ while (received < futures.size()) {
+ Future<Boolean> resultFuture = completionService.take();
+ Boolean result = resultFuture.get();
+ received++;
+ assertTrue(result);
+ }
+ exec.shutdownNow();
}
- exec.shutdownNow();
- conn.close();
}
+
+ /**
+ * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
+ */
+ @Test
+ public void testSplitDuringUpsertSelect() throws Exception {
+ int numUpsertSelectRunners = 4;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ try (Connection conn = driver.connect(url, props)) {
+ final UpsertSelectRunner upsertSelectRunner =
+ new UpsertSelectRunner(dataTable, 0, 105, 1);
+ // keep running slow upsert selects
+ SlowBatchRegionObserver.SLOW_MUTATE = true;
+ for (int i = 0; i < numUpsertSelectRunners; i++) {
+ exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+ Thread.sleep(300);
+ }
+
+ // keep trying to split the region
+ final HBaseTestingUtility utility = getUtility();
+ final HBaseAdmin admin = utility.getHBaseAdmin();
+ final TableName dataTN = TableName.valueOf(dataTable);
+ assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
+ utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ List<HRegionInfo> regions = admin.getTableRegions(dataTN);
+ if (regions.size() > 1) {
+ logger.info("Found region was split");
+ return true;
+ }
+ if (regions.size() == 0) {
+ // This happens when region in transition or closed
+ logger.info("No region returned");
+ return false;
+ }
+ ;
+ HRegionInfo hRegion = regions.get(0);
+ logger.info("Attempting to split region");
+ admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
+ return false;
+ } catch (NotServingRegionException nsre) {
+ // during split
+ return false;
+ }
+ }
+ });
+ } finally {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ exec.shutdownNow();
+ exec.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Tests that UPSERT SELECT doesn't indefinitely block region closes
+ */
+ @Test
+ public void testRegionCloseDuringUpsertSelect() throws Exception {
+ int numUpsertSelectRunners = 4;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ try (Connection conn = driver.connect(url, props)) {
+ final UpsertSelectRunner upsertSelectRunner =
+ new UpsertSelectRunner(dataTable, 0, 105, 1);
+ // keep running slow upsert selects
+ SlowBatchRegionObserver.SLOW_MUTATE = true;
+ for (int i = 0; i < numUpsertSelectRunners; i++) {
+ exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+ Thread.sleep(300);
+ }
+
+ final HBaseTestingUtility utility = getUtility();
+ // try to close the region while UPSERT SELECTs are happening,
+ final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
+ final HBaseAdmin admin = utility.getHBaseAdmin();
+ final HRegionInfo dataRegion =
+ admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+ logger.info("Closing data table region");
+ admin.closeRegion(dataRs.getServerName(), dataRegion);
+ // make sure the region is offline
+ utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<HRegionInfo> onlineRegions =
+ admin.getOnlineRegions(dataRs.getServerName());
+ for (HRegionInfo onlineRegion : onlineRegions) {
+ if (onlineRegion.equals(dataRegion)) {
+ logger.info("Data region still online");
+ return false;
+ }
+ }
+ logger.info("Region is no longer online");
+ return true;
+ }
+ });
+ } finally {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ exec.shutdownNow();
+ exec.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+ public static volatile boolean SLOW_MUTATE = false;
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
// model a slow batch that takes a long time
- if (miniBatchOp.size()==100) {
+ if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0773ebc..a2a1b5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
/**
* This lock used for synchronizing the state of
* {@link UngroupedAggregateRegionObserver#scansReferenceCount},
- * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+ * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible
* dead lock situation in case below steps:
* 1. We get read lock when we start writing local indexes, deletes etc..
* 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@GuardedBy("lock")
private int scansReferenceCount = 0;
@GuardedBy("lock")
- private boolean isRegionClosing = false;
+ private boolean isRegionClosingOrSplitting = false;
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
*/
private void checkForRegionClosing() throws IOException {
synchronized (lock) {
- if(isRegionClosing) {
+ if(isRegionClosingOrSplitting) {
lock.notifyAll();
throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
}
@@ -499,13 +499,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
useIndexProto = false;
}
boolean acquiredLock = false;
- try {
- if(needToWrite) {
- synchronized (lock) {
- scansReferenceCount++;
- lock.notifyAll();
+ if(needToWrite) {
+ synchronized (lock) {
+ if (isRegionClosingOrSplitting) {
+ throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
}
+ scansReferenceCount++;
+ lock.notifyAll();
}
+ }
+ try {
region.startRegionOperation();
acquiredLock = true;
synchronized (innerScanner) {
@@ -1295,6 +1298,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Don't allow splitting if operations need read and write to same region are going on in the
// the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
synchronized (lock) {
+ isRegionClosingOrSplitting = true;
if (scansReferenceCount > 0) {
throw new IOException("Operations like local index building/delete/upsert select"
+ " might be going on so not allowing to split.");
@@ -1319,12 +1323,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
throws IOException {
synchronized (lock) {
- isRegionClosing = true;
+ isRegionClosingOrSplitting = true;
while (scansReferenceCount > 0) {
try {
lock.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for completion of operations like local index building/delete/upsert select");
}
}
}