You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 20:14:49 UTC

[4/4] phoenix git commit: PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)

PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6f65a793
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6f65a793
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6f65a793

Branch: refs/heads/4.x-HBase-1.2
Commit: 6f65a7935b640969e570b870de9fa59e2a5bca67
Parents: 4354a2c
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Sep 26 09:31:44 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:11:12 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 239 +++++++++++++++----
 .../UngroupedAggregateRegionObserver.java       |  23 +-
 2 files changed, 209 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..fbf3231 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -32,25 +33,43 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-    
+    private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+    private Properties props;
+    private static volatile String dataTable;
+    private String index;
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -60,7 +79,12 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
         Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
-    
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        getUtility().shutdownMiniCluster();
+    }
+
     private class UpsertSelectRunner implements Callable<Boolean> {
     	private final String dataTable;
     	private final int minIndex;
@@ -89,58 +113,185 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
 				return true;
 			}
 		}
-    	
     }
-    
+
+    private static class UpsertSelectLooper implements Runnable {
+        private UpsertSelectRunner runner;
+        public UpsertSelectLooper(UpsertSelectRunner runner) {
+            this.runner = runner;
+        }
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    runner.call();
+                }
+                catch (Exception e) {
+                    if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) {
+                        logger.info("Interrupted, exiting", e);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                    logger.error("Hit exception while writing", e);
+                }
+            }
+        }};
+
+    @Before
+    public void setup() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        dataTable = generateUniqueName();
+        index = "IDX_" + dataTable;
+        try (Connection conn = driver.connect(url, props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTable
+                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            // create the index and ensure its empty as well
+            conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+            conn.setAutoCommit(false);
+            for (int i = 0; i < 100; i++) {
+                stmt.setInt(1, i);
+                stmt.setString(2, "v1" + i);
+                stmt.setString(3, "v2" + i);
+                stmt.execute();
+            }
+            conn.commit();
+        }
+    }
+
 	@Test
 	public void testUpsertSelectSameBatchConcurrently() throws Exception {
-		final String dataTable = generateUniqueName();
-		final String index = "IDX_" + dataTable;
-		// create the table and ensure its empty
-		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-		Connection conn = driver.connect(url, props);
-		conn.createStatement()
-				.execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-		// create the index and ensure its empty as well
-		conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-
-		conn = DriverManager.getConnection(getUrl(), props);
-		PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
-		conn.setAutoCommit(false);
-		for (int i = 0; i < 100; i++) {
-			stmt.setInt(1, i);
-			stmt.setString(2, "v1" + i);
-			stmt.setString(3, "v2" + i);
-			stmt.execute();
-		}
-		conn.commit();
-
-		int numUpsertSelectRunners = 5;
-		ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-		CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
-		List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-		// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
-		futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
-		// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
-		for (int i = 0; i < 100; i += 25) {
-			futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
-		}
-		int received = 0;
-		while (received < futures.size()) {
-			Future<Boolean> resultFuture = completionService.take(); 
-			Boolean result = resultFuture.get();
-			received++;
-			assertTrue(result);
+		try (Connection conn = driver.connect(url, props)) {
+		        int numUpsertSelectRunners = 5;
+		        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+		        CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+		        List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+		        // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+		        futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+		        // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+		        for (int i = 0; i < 100; i += 25) {
+		            futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+		        }
+		        int received = 0;
+		        while (received < futures.size()) {
+		            Future<Boolean> resultFuture = completionService.take(); 
+		            Boolean result = resultFuture.get();
+		            received++;
+		            assertTrue(result);
+		        }
+		        exec.shutdownNow();
 		}
-		exec.shutdownNow();
-		conn.close();
 	}
+
+    /**
+     * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
+     */
+	@Test
+    public void testSplitDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            // keep trying to split the region
+            final HBaseTestingUtility utility = getUtility();
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final TableName dataTN = TableName.valueOf(dataTable);
+            assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
+            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    try {
+                        List<HRegionInfo> regions = admin.getTableRegions(dataTN);
+                        if (regions.size() > 1) {
+                            logger.info("Found region was split");
+                            return true;
+                        }
+                        if (regions.size() == 0) {
+                            // This happens when region in transition or closed
+                            logger.info("No region returned");
+                            return false;
+                        }
+                        ;
+                        HRegionInfo hRegion = regions.get(0);
+                        logger.info("Attempting to split region");
+                        admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
+                        return false;
+                    } catch (NotServingRegionException nsre) {
+                        // during split
+                        return false;
+                    }
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Tests that UPSERT SELECT doesn't indefinitely block region closes
+     */
+    @Test
+    public void testRegionCloseDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            final HBaseTestingUtility utility = getUtility();
+            // try to close the region while UPSERT SELECTs are happening,
+            final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final HRegionInfo dataRegion =
+                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+            logger.info("Closing data table region");
+            admin.closeRegion(dataRs.getServerName(), dataRegion);
+            // make sure the region is offline
+            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    List<HRegionInfo> onlineRegions =
+                            admin.getOnlineRegions(dataRs.getServerName());
+                    for (HRegionInfo onlineRegion : onlineRegions) {
+                        if (onlineRegion.equals(dataRegion)) {
+                            logger.info("Data region still online");
+                            return false;
+                        }
+                    }
+                    logger.info("Region is no longer online");
+                    return true;
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
         	// model a slow batch that takes a long time
-            if (miniBatchOp.size()==100) {
+            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) {
             	try {
 					Thread.sleep(6000);
 				} catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0773ebc..a2a1b5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosing = false;
+    private boolean isRegionClosingOrSplitting = false;
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosing) {
+            if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
             }
@@ -499,13 +499,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
-        try {
-            if(needToWrite) {
-                synchronized (lock) {
-                    scansReferenceCount++;
-                    lock.notifyAll();
+        if(needToWrite) {
+            synchronized (lock) {
+                if (isRegionClosingOrSplitting) {
+                    throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
                 }
+                scansReferenceCount++;
+                lock.notifyAll();
             }
+        }
+        try {
             region.startRegionOperation();
             acquiredLock = true;
             synchronized (innerScanner) {
@@ -1295,6 +1298,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
+            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index building/delete/upsert select"
                         + " might be going on so not allowing to split.");
@@ -1319,12 +1323,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosing = true;
+            isRegionClosingOrSplitting = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
+                    throw new IOException("Interrupted while waiting for completion of operations like local index building/delete/upsert select");
                 }
             }
         }