You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/03/13 18:53:00 UTC

svn commit: r1456064 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-server/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/test/java/org/a...

Author: enis
Date: Wed Mar 13 17:53:00 2013
New Revision: 1456064

URL: http://svn.apache.org/r1456064
Log:
HBASE-8036 ProtobufUtil.multi behavior is inconsistent in case of errors

Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1456064&r1=1456063&r2=1456064&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Mar 13 17:53:00 2013
@@ -18,7 +18,35 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.protobuf.ServiceException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -65,34 +93,7 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.protobuf.ServiceException;
 
 /**
  * A non-instantiable class that manages {@link HConnection}s.
@@ -932,7 +933,7 @@ public class HConnectionManager {
     throws IOException {
       return locateRegions (tableName, false, true);
     }
-    
+
     @Override
     public List<HRegionLocation> locateRegions(final byte[] tableName, final boolean useCache,
         final boolean offlined) throws IOException {
@@ -2087,7 +2088,9 @@ public class HConnectionManager {
             for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
               for (Action<R> action : actions) {
                 Row row = action.getAction();
-                hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond());
+                // Do not use the exception for updating cache because it might be coming from
+                // any of the regions in the MultiAction.
+                hci.updateCachedLocations(tableName, row, null, currentTask.getSecond());
                 if (noRetry) {
                   errors.add(exception, row, currentTask);
                 } else {

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1456064&r1=1456063&r2=1456064&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Mar 13 17:53:00 2013
@@ -17,15 +17,22 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -107,22 +114,15 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableSet;
-
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
 
 /**
  * Protobufs utility.
@@ -1021,37 +1021,43 @@ public final class ProtobufUtil {
    */
   public static <R> MultiResponse multi(final ClientProtocol client,
       final MultiAction<R> multi) throws IOException {
-    try {
-      MultiResponse response = new MultiResponse();
-      for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
-        byte[] regionName = e.getKey();
-        int rowMutations = 0;
-        List<Action<R>> actions = e.getValue();
-        for (Action<R> action: actions) {
-          Row row = action.getAction();
-          if (row instanceof RowMutations) {
+    MultiResponse response = new MultiResponse();
+    for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
+      byte[] regionName = e.getKey();
+      int rowMutations = 0;
+      List<Action<R>> actions = e.getValue();
+      for (Action<R> action: actions) {
+        Row row = action.getAction();
+        if (row instanceof RowMutations) {
+          try {
             MultiRequest request =
-              RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
+                RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
             client.multi(null, request);
             response.add(regionName, action.getOriginalIndex(), new Result());
-            rowMutations++;
+          } catch (ServiceException se) {
+            response.add(regionName, action.getOriginalIndex(), getRemoteException(se));
           }
+          rowMutations++;
         }
-        if (actions.size() > rowMutations) {
+      }
+      if (actions.size() > rowMutations) {
+        Exception ex = null;
+        List<Object> results = null;
+        try {
           MultiRequest request =
-            RequestConverter.buildMultiRequest(regionName, actions);
+              RequestConverter.buildMultiRequest(regionName, actions);
           ClientProtos.MultiResponse proto = client.multi(null, request);
-          List<Object> results = ResponseConverter.getResults(proto);
-          for (int i = 0, n = results.size(); i < n; i++) {
-            int originalIndex = actions.get(i).getOriginalIndex();
-            response.add(regionName, originalIndex, results.get(i));
-          }
+          results = ResponseConverter.getResults(proto);
+        } catch (ServiceException se) {
+          ex = getRemoteException(se);
+        }
+        for (int i = 0, n = actions.size(); i < n; i++) {
+          int originalIndex = actions.get(i).getOriginalIndex();
+          response.add(regionName, originalIndex, results == null ? ex : results.get(i));
         }
       }
-      return response;
-    } catch (ServiceException se) {
-      throw getRemoteException(se);
     }
+    return response;
   }
 
   /**

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1456064&r1=1456063&r2=1456064&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Wed Mar 13 17:53:00 2013
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
 import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -2471,4 +2472,19 @@ public class HBaseTestingUtility extends
       boolean failIfTimeout, Predicate<E> predicate) throws E {
     return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
   }
+
+  /**
+   * Returns a {@link Predicate} for checking that there is no regions in transition in master
+   */
+  public Waiter.Predicate<Exception> predicateNoRegionsInTransition() {
+    return new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
+            .getAssignmentManager().getRegionStates();
+        return !regionStates.isRegionsInTransition();
+      }
+    };
+  }
+
 }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1456064&r1=1456063&r2=1456064&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Mar 13 17:53:00 2013
@@ -18,10 +18,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -38,11 +38,17 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -52,6 +58,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+
 /**
  * This class is for testing HCM features
  */
@@ -62,8 +70,10 @@ public class TestHCM {
   private static final byte[] TABLE_NAME = Bytes.toBytes("test");
   private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1");
   private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2");
+  private static final byte[] TABLE_NAME3 = Bytes.toBytes("test3");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
+  private static final byte[] ROW_X = Bytes.toBytes("xxx");
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -106,14 +116,14 @@ public class TestHCM {
   private static int getHConnectionManagerCacheSize(){
     return HConnectionTestingUtility.getConnectionCount();
   }
-  
+
   @Test
   public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
     // Save off current HConnections
-    Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances = 
+    Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
         new HashMap<HConnectionKey, HConnectionImplementation>();
     oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES);
-    
+
     HConnectionManager.HBASE_INSTANCES.clear();
 
     try {
@@ -536,6 +546,120 @@ public class TestHCM {
     conn.close();
   }
 
+  @Test
+  public void testMulti() throws Exception {
+    HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
+    TEST_UTIL.createMultiRegions(table, FAM_NAM);
+    HConnectionManager.HConnectionImplementation conn =
+      (HConnectionManager.HConnectionImplementation)
+        HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+
+    // We're now going to move the region and check that it works for the client
+    // First a new put to add the location in the cache
+    conn.clearRegionCache(TABLE_NAME3);
+    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
+
+    TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
+    // We can wait for all regions to be online, that makes log reading easier when debugging
+    while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+      Thread.sleep(1);
+    }
+
+    Put put = new Put(ROW_X);
+    put.add(FAM_NAM, ROW_X, ROW_X);
+    table.put(put);
+
+    // Now moving the region to the second server
+    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X);
+    byte[] regionName = toMove.getRegionInfo().getRegionName();
+    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
+
+    // Choose the other server.
+    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
+    int destServerId = (curServerId == 0 ? 1 : 0);
+
+    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
+    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
+
+    ServerName destServerName = destServer.getServerName();
+
+    //find another row in the cur server that is less than ROW_X
+    List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
+    byte[] otherRow = null;
+    for (HRegion region : regions) {
+      if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
+          && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
+        otherRow = region.getRegionInfo().getStartKey();
+        break;
+      }
+    }
+    assertNotNull(otherRow);
+    Put put2 = new Put(otherRow);
+    put2.add(FAM_NAM, otherRow, otherRow);
+    table.put(put2); //cache put2's location
+
+    // Check that we are in the expected state
+    Assert.assertTrue(curServer != destServer);
+    Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
+    Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
+    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
+    Assert.assertNull(destServer.getOnlineRegion(regionName));
+    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
+        getAssignmentManager().getRegionStates().isRegionsInTransition());
+
+    // Moving. It's possible that we don't have all the regions online at this point, so
+    //  the test must depends only on the region we're looking at.
+    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
+    TEST_UTIL.getHBaseAdmin().move(
+      toMove.getRegionInfo().getEncodedNameAsBytes(),
+      destServerName.getServerName().getBytes()
+    );
+
+    while (destServer.getOnlineRegion(regionName) == null ||
+        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+      // wait for the move to be finished
+      Thread.sleep(1);
+    }
+
+    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
+
+    // Check our new state.
+    Assert.assertNull(curServer.getOnlineRegion(regionName));
+    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
+    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+
+
+    // Cache was NOT updated and points to the wrong server
+    Assert.assertFalse(
+        conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
+
+    // Hijack the number of retry to fail after 2 tries
+    Field numRetries = conn.getClass().getDeclaredField("numRetries");
+    numRetries.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
+    final int prevNumRetriesVal = (Integer)numRetries.get(conn);
+    numRetries.set(conn, 2);
+
+    Put put3 = new Put(ROW_X);
+    put3.add(FAM_NAM, ROW_X, ROW_X);
+    Put put4 = new Put(otherRow);
+    put4.add(FAM_NAM, otherRow, otherRow);
+
+    // do multi
+    table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
+                                                 // second we get RegionMovedException.
+
+    numRetries.set(conn, prevNumRetriesVal);
+    table.close();
+    conn.close();
+  }
 
 }
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1456064&r1=1456063&r2=1456064&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Wed Mar 13 17:53:00 2013
@@ -35,10 +35,9 @@ import org.apache.commons.logging.impl.L
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.HBaseClient;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -85,16 +84,13 @@ public class TestMultiParallel {
     if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
       // Distribute regions
       UTIL.getMiniHBaseCluster().getMaster().balance();
+
       // Wait until completing balance
-      final RegionStates regionStates = UTIL.getMiniHBaseCluster().getMaster()
-          .getAssignmentManager().getRegionStates();
-      UTIL.waitFor(15 * 1000, new Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return !regionStates.isRegionsInTransition();
-        }
-      });
+      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
     }
+    HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
+    conn.clearRegionCache();
+    conn.close();
     LOG.info("before done");
   }
 
@@ -143,7 +139,7 @@ public class TestMultiParallel {
    * @throws NoSuchFieldException
    * @throws SecurityException
    */
-  @Test(timeout=300000) 
+  @Test(timeout=300000)
   public void testActiveThreadsCount() throws Exception{
     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
     List<Row> puts = constructPutRequests(); // creates a Put for every region
@@ -155,7 +151,7 @@ public class TestMultiParallel {
     table.close();
   }
 
-  @Test(timeout=300000) 
+  @Test(timeout=300000)
   public void testBatchWithGet() throws Exception {
     LOG.info("test=testBatchWithGet");
     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
@@ -237,7 +233,7 @@ public class TestMultiParallel {
    *
    * @throws Exception
    */
-  @Test (timeout=300000) 
+  @Test (timeout=300000)
   public void testFlushCommitsWithAbort() throws Exception {
     LOG.info("test=testFlushCommitsWithAbort");
     doTestFlushCommits(true);
@@ -262,7 +258,7 @@ public class TestMultiParallel {
     }
     LOG.info("puts");
     table.flushCommits();
-    int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
+    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
         .size();
     assert liveRScount > 0;
     JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
@@ -304,6 +300,18 @@ public class TestMultiParallel {
       int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
       // Assert.assertTrue("Count of regions=" + regions, regions > 10);
     }
+    if (doAbort) {
+      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
+      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return UTIL.getMiniHBaseCluster().getMaster()
+              .getClusterStatus().getServersSize() == (liveRScount - 1);
+        }
+      });
+      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
+    }
+
     table.close();
     LOG.info("done");
   }
@@ -337,7 +345,7 @@ public class TestMultiParallel {
     table.close();
   }
 
-  @Test(timeout=300000) 
+  @Test(timeout=300000)
   public void testBatchWithDelete() throws Exception {
     LOG.info("test=testBatchWithDelete");
     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);