You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/29 19:29:17 UTC

[1/2] phoenix git commit: Revert "PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)"

Repository: phoenix
Updated Branches:
  refs/heads/master e9593529f -> f00380ef7


Revert "PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)"

This reverts commit 5d9572736a991f19121477a0822d4b8bf26b4c69.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f4e51143
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f4e51143
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f4e51143

Branch: refs/heads/master
Commit: f4e51143a35bdd48f025b925c9988f98de92a4c3
Parents: e959352
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Sep 29 12:26:09 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 29 12:26:09 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 239 ++++---------------
 .../UngroupedAggregateRegionObserver.java       |  23 +-
 2 files changed, 53 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4e51143/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index fbf3231..53346b9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -33,43 +32,25 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-    private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
-    private Properties props;
-    private static volatile String dataTable;
-    private String index;
-
+    
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -79,12 +60,7 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
         Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
-
-    @AfterClass
-    public static void tearDownClass() throws Exception {
-        getUtility().shutdownMiniCluster();
-    }
-
+    
     private class UpsertSelectRunner implements Callable<Boolean> {
     	private final String dataTable;
     	private final int minIndex;
@@ -113,185 +89,58 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
 				return true;
 			}
 		}
+    	
     }
-
-    private static class UpsertSelectLooper implements Runnable {
-        private UpsertSelectRunner runner;
-        public UpsertSelectLooper(UpsertSelectRunner runner) {
-            this.runner = runner;
-        }
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    runner.call();
-                }
-                catch (Exception e) {
-                    if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) {
-                        logger.info("Interrupted, exiting", e);
-                        Thread.currentThread().interrupt();
-                        return;
-                    }
-                    logger.error("Hit exception while writing", e);
-                }
-            }
-        }};
-
-    @Before
-    public void setup() throws Exception {
-        SlowBatchRegionObserver.SLOW_MUTATE = false;
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        dataTable = generateUniqueName();
-        index = "IDX_" + dataTable;
-        try (Connection conn = driver.connect(url, props)) {
-            conn.createStatement().execute("CREATE TABLE " + dataTable
-                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            // create the index and ensure its empty as well
-            conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
-            conn.setAutoCommit(false);
-            for (int i = 0; i < 100; i++) {
-                stmt.setInt(1, i);
-                stmt.setString(2, "v1" + i);
-                stmt.setString(3, "v2" + i);
-                stmt.execute();
-            }
-            conn.commit();
-        }
-    }
-
+    
 	@Test
 	public void testUpsertSelectSameBatchConcurrently() throws Exception {
-		try (Connection conn = driver.connect(url, props)) {
-		        int numUpsertSelectRunners = 5;
-		        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-		        CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
-		        List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-		        // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
-		        futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
-		        // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
-		        for (int i = 0; i < 100; i += 25) {
-		            futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
-		        }
-		        int received = 0;
-		        while (received < futures.size()) {
-		            Future<Boolean> resultFuture = completionService.take(); 
-		            Boolean result = resultFuture.get();
-		            received++;
-		            assertTrue(result);
-		        }
-		        exec.shutdownNow();
+		final String dataTable = generateUniqueName();
+		final String index = "IDX_" + dataTable;
+		// create the table and ensure its empty
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		Connection conn = driver.connect(url, props);
+		conn.createStatement()
+				.execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+		// create the index and ensure its empty as well
+		conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+
+		conn = DriverManager.getConnection(getUrl(), props);
+		PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+		conn.setAutoCommit(false);
+		for (int i = 0; i < 100; i++) {
+			stmt.setInt(1, i);
+			stmt.setString(2, "v1" + i);
+			stmt.setString(3, "v2" + i);
+			stmt.execute();
+		}
+		conn.commit();
+
+		int numUpsertSelectRunners = 5;
+		ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+		CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+		List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+		// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+		futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+		// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+		for (int i = 0; i < 100; i += 25) {
+			futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+		}
+		int received = 0;
+		while (received < futures.size()) {
+			Future<Boolean> resultFuture = completionService.take(); 
+			Boolean result = resultFuture.get();
+			received++;
+			assertTrue(result);
 		}
+		exec.shutdownNow();
+		conn.close();
 	}
-
-    /**
-     * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
-     */
-	@Test
-    public void testSplitDuringUpsertSelect() throws Exception {
-        int numUpsertSelectRunners = 4;
-        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-        try (Connection conn = driver.connect(url, props)) {
-            final UpsertSelectRunner upsertSelectRunner =
-                    new UpsertSelectRunner(dataTable, 0, 105, 1);
-            // keep running slow upsert selects
-            SlowBatchRegionObserver.SLOW_MUTATE = true;
-            for (int i = 0; i < numUpsertSelectRunners; i++) {
-                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
-                Thread.sleep(300);
-            }
-
-            // keep trying to split the region
-            final HBaseTestingUtility utility = getUtility();
-            final HBaseAdmin admin = utility.getHBaseAdmin();
-            final TableName dataTN = TableName.valueOf(dataTable);
-            assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
-            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    try {
-                        List<HRegionInfo> regions = admin.getTableRegions(dataTN);
-                        if (regions.size() > 1) {
-                            logger.info("Found region was split");
-                            return true;
-                        }
-                        if (regions.size() == 0) {
-                            // This happens when region in transition or closed
-                            logger.info("No region returned");
-                            return false;
-                        }
-                        ;
-                        HRegionInfo hRegion = regions.get(0);
-                        logger.info("Attempting to split region");
-                        admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
-                        return false;
-                    } catch (NotServingRegionException nsre) {
-                        // during split
-                        return false;
-                    }
-                }
-            });
-        } finally {
-            SlowBatchRegionObserver.SLOW_MUTATE = false;
-            exec.shutdownNow();
-            exec.awaitTermination(60, TimeUnit.SECONDS);
-        }
-    }
-
-    /**
-     * Tests that UPSERT SELECT doesn't indefinitely block region closes
-     */
-    @Test
-    public void testRegionCloseDuringUpsertSelect() throws Exception {
-        int numUpsertSelectRunners = 4;
-        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-        try (Connection conn = driver.connect(url, props)) {
-            final UpsertSelectRunner upsertSelectRunner =
-                    new UpsertSelectRunner(dataTable, 0, 105, 1);
-            // keep running slow upsert selects
-            SlowBatchRegionObserver.SLOW_MUTATE = true;
-            for (int i = 0; i < numUpsertSelectRunners; i++) {
-                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
-                Thread.sleep(300);
-            }
-
-            final HBaseTestingUtility utility = getUtility();
-            // try to close the region while UPSERT SELECTs are happening,
-            final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
-            final HBaseAdmin admin = utility.getHBaseAdmin();
-            final HRegionInfo dataRegion =
-                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
-            logger.info("Closing data table region");
-            admin.closeRegion(dataRs.getServerName(), dataRegion);
-            // make sure the region is offline
-            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    List<HRegionInfo> onlineRegions =
-                            admin.getOnlineRegions(dataRs.getServerName());
-                    for (HRegionInfo onlineRegion : onlineRegions) {
-                        if (onlineRegion.equals(dataRegion)) {
-                            logger.info("Data region still online");
-                            return false;
-                        }
-                    }
-                    logger.info("Region is no longer online");
-                    return true;
-                }
-            });
-        } finally {
-            SlowBatchRegionObserver.SLOW_MUTATE = false;
-            exec.shutdownNow();
-            exec.awaitTermination(60, TimeUnit.SECONDS);
-        }
-    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
-        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
         	// model a slow batch that takes a long time
-            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) {
+            if (miniBatchOp.size()==100) {
             	try {
 					Thread.sleep(6000);
 				} catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4e51143/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 582e606..30f89cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosingOrSplitting = false;
+    private boolean isRegionClosing = false;
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosingOrSplitting) {
+            if(isRegionClosing) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
             }
@@ -499,16 +499,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
-        if(needToWrite) {
-            synchronized (lock) {
-                if (isRegionClosingOrSplitting) {
-                    throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
+        try {
+            if(needToWrite) {
+                synchronized (lock) {
+                    scansReferenceCount++;
+                    lock.notifyAll();
                 }
-                scansReferenceCount++;
-                lock.notifyAll();
             }
-        }
-        try {
             region.startRegionOperation();
             acquiredLock = true;
             synchronized (innerScanner) {
@@ -1298,7 +1295,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
-            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index building/delete/upsert select"
                         + " might be going on so not allowing to split.");
@@ -1323,13 +1319,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosingOrSplitting = true;
+            isRegionClosing = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    throw new IOException("Interrupted while waiting for completion of operations like local index building/delete/upsert select");
                 }
             }
         }


[2/2] phoenix git commit: PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)

Posted by ja...@apache.org.
PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f00380ef
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f00380ef
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f00380ef

Branch: refs/heads/master
Commit: f00380ef79795a675cfa5b50082147a273ac26f9
Parents: f4e5114
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Sep 29 12:26:41 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 29 12:26:41 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 245 +++++++++++++++----
 .../UngroupedAggregateRegionObserver.java       |  16 +-
 2 files changed, 210 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f00380ef/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..dc9de81 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -32,35 +33,59 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-    
+    private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+    private Properties props;
+    private static volatile String dataTable;
+    private String index;
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
         serverProps.put("hbase.coprocessor.region.classes", SlowBatchRegionObserver.class.getName());
         serverProps.put("hbase.rowlock.wait.duration", "5000");
         serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100");
-        Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()));
     }
-    
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        getUtility().shutdownMiniCluster();
+    }
+
     private class UpsertSelectRunner implements Callable<Boolean> {
     	private final String dataTable;
     	private final int minIndex;
@@ -89,58 +114,186 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
 				return true;
 			}
 		}
-    	
     }
-    
+
+    private static class UpsertSelectLooper implements Runnable {
+        private UpsertSelectRunner runner;
+        public UpsertSelectLooper(UpsertSelectRunner runner) {
+            this.runner = runner;
+        }
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    runner.call();
+                }
+                catch (Exception e) {
+                    if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) {
+                        logger.info("Interrupted, exiting", e);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                    logger.error("Hit exception while writing", e);
+                }
+            }
+        }};
+
+    @Before
+    public void setup() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+        dataTable = generateUniqueName();
+        index = "IDX_" + dataTable;
+        try (Connection conn = driver.connect(url, props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTable
+                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            // create the index and ensure its empty as well
+            conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+            conn.setAutoCommit(false);
+            for (int i = 0; i < 100; i++) {
+                stmt.setInt(1, i);
+                stmt.setString(2, "v1" + i);
+                stmt.setString(3, "v2" + i);
+                stmt.execute();
+            }
+            conn.commit();
+        }
+    }
+
 	@Test
 	public void testUpsertSelectSameBatchConcurrently() throws Exception {
-		final String dataTable = generateUniqueName();
-		final String index = "IDX_" + dataTable;
-		// create the table and ensure its empty
-		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-		Connection conn = driver.connect(url, props);
-		conn.createStatement()
-				.execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-		// create the index and ensure its empty as well
-		conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-
-		conn = DriverManager.getConnection(getUrl(), props);
-		PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
-		conn.setAutoCommit(false);
-		for (int i = 0; i < 100; i++) {
-			stmt.setInt(1, i);
-			stmt.setString(2, "v1" + i);
-			stmt.setString(3, "v2" + i);
-			stmt.execute();
-		}
-		conn.commit();
-
-		int numUpsertSelectRunners = 5;
-		ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-		CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
-		List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-		// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
-		futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
-		// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
-		for (int i = 0; i < 100; i += 25) {
-			futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
-		}
-		int received = 0;
-		while (received < futures.size()) {
-			Future<Boolean> resultFuture = completionService.take(); 
-			Boolean result = resultFuture.get();
-			received++;
-			assertTrue(result);
+		try (Connection conn = driver.connect(url, props)) {
+		        int numUpsertSelectRunners = 5;
+		        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+		        CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+		        List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+		        // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+		        futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+		        // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+		        for (int i = 0; i < 100; i += 25) {
+		            futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+		        }
+		        int received = 0;
+		        while (received < futures.size()) {
+		            Future<Boolean> resultFuture = completionService.take();
+		            Boolean result = resultFuture.get();
+		            received++;
+		            assertTrue(result);
+		        }
+		        exec.shutdownNow();
 		}
-		exec.shutdownNow();
-		conn.close();
 	}
+
+    /**
+     * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
+     */
+	@Test
+    public void testSplitDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            // keep trying to split the region
+            final HBaseTestingUtility utility = getUtility();
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final TableName dataTN = TableName.valueOf(dataTable);
+            assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    try {
+                        List<HRegionInfo> regions = admin.getTableRegions(dataTN);
+                        if (regions.size() > 1) {
+                            logger.info("Found region was split");
+                            return true;
+                        }
+                        if (regions.size() == 0) {
+                            // This happens when region in transition or closed
+                            logger.info("No region returned");
+                            return false;
+                        }
+                        ;
+                        HRegionInfo hRegion = regions.get(0);
+                        logger.info("Attempting to split region");
+                        admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
+                        return false;
+                    } catch (NotServingRegionException nsre) {
+                        // during split
+                        return false;
+                    }
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Tests that UPSERT SELECT doesn't indefinitely block region closes
+     */
+    @Test
+    public void testRegionCloseDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            final HBaseTestingUtility utility = getUtility();
+            // try to close the region while UPSERT SELECTs are happening,
+            final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final HRegionInfo dataRegion =
+                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+            logger.info("Closing data table region");
+            admin.closeRegion(dataRs.getServerName(), dataRegion);
+            // make sure the region is offline
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    List<HRegionInfo> onlineRegions =
+                            admin.getOnlineRegions(dataRs.getServerName());
+                    for (HRegionInfo onlineRegion : onlineRegions) {
+                        if (onlineRegion.equals(dataRegion)) {
+                            logger.info("Data region still online");
+                            return false;
+                        }
+                    }
+                    logger.info("Region is no longer online");
+                    return true;
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
         	// model a slow batch that takes a long time
-            if (miniBatchOp.size()==100) {
+            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) {
             	try {
 					Thread.sleep(6000);
 				} catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f00380ef/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 30f89cb..c3024a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosing = false;
+    private boolean isRegionClosingOrSplitting = false;
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosing) {
+            if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
             }
@@ -499,10 +499,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
+        boolean incrScanRefCount = false;
         try {
             if(needToWrite) {
                 synchronized (lock) {
+                    if (isRegionClosingOrSplitting) {
+                        throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
+                    }
                     scansReferenceCount++;
+                    incrScanRefCount = true;
                     lock.notifyAll();
                 }
             }
@@ -755,7 +760,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 }
             }
         } finally {
-            if (needToWrite) {
+            if (needToWrite && incrScanRefCount) {
                 synchronized (lock) {
                     scansReferenceCount--;
                     if (scansReferenceCount < 0) {
@@ -1295,6 +1300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
+            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index building/delete/upsert select"
                         + " might be going on so not allowing to split.");
@@ -1319,7 +1325,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosing = true;
+            isRegionClosingOrSplitting = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);