You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:24 UTC

[03/37] hbase git commit: HBASE-15232 Handle region location cache mgmt in AsyncProcess for multi()'s

HBASE-15232 Handle region location cache mgmt in AsyncProcess for multi()'s

Further investigation after HBASE-15221 lead to some findings that
AsyncProcess should have been managing the contents of the region
location cache, appropriately clearing it when necessary (e.g. an
RPC to a server fails because the server doesn't host that region)

For multi() RPCs, the tableName argument is null since there is no
single table that the updates are destined to. This inadvertently
caused the existing region location cache updates to fail on 1.x
branches. AsyncProcess needs to handle when tableName is null
and perform the necessary cache evictions.

As such, much of the new retry logic in HTableMultiplexer is
unnecessary and is removed with this commit. Getters which were
added as a part of testing were left since that are mostly
harmless and should contain no negative impact.

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8073c4a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8073c4a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8073c4a

Branch: refs/heads/hbase-12439
Commit: a8073c4a9819953a2dd13a26bb4dd9405ac8750c
Parents: 5e50112
Author: Josh Elser <el...@apache.org>
Authored: Mon Feb 8 14:25:37 2016 -0500
Committer: stack <st...@apache.org>
Committed: Mon Feb 22 22:03:14 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  11 +-
 .../hadoop/hbase/client/HTableMultiplexer.java  |  23 ++--
 .../hbase/exceptions/ClientExceptionsUtil.java  |   6 +-
 .../client/TestHTableMultiplexerViaMocks.java   | 117 -------------------
 .../client/TestHTableMultiplexerFlushCache.java |  60 ++++++++++
 5 files changed, 81 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a8073c4a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 4ceb89a..65c15ce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1360,8 +1360,15 @@ class AsyncProcess {
           errorsByServer.reportServerError(server);
           canRetry = errorsByServer.canTryMore(numAttempt);
         }
-        connection.updateCachedLocations(
-            tableName, region, actions.get(0).getAction().getRow(), throwable, server);
+        if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
+          // For multi-actions, we don't have a table name, but we want to make sure to clear the
+          // cache in case there were location-related exceptions. We don't to clear the cache
+          // for every possible exception that comes through, however.
+          connection.clearCaches(server);
+        } else {
+          connection.updateCachedLocations(
+              tableName, region, actions.get(0).getAction().getRow(), throwable, server);
+        }
         failureCount += actions.size();
 
         for (Action<Row> action : actions) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8073c4a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 13e9b85..f1bbcb3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -194,19 +194,6 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer queue.
    */
   public boolean put(final TableName tableName, final Put put, int maxAttempts) {
-    return _put(tableName, put, maxAttempts, false);
-  }
-
-  /**
-   * Internal "put" which exposes a boolean flag to control whether or not the region location
-   * cache should be reloaded when trying to queue the {@link Put}.
-   * @param tableName Destination table for the Put
-   * @param put The Put to send
-   * @param maxAttempts Number of attempts to retry the {@code put}
-   * @param reloadCache Should the region location cache be reloaded
-   * @return true if the request was accepted in the queue, otherwise false
-   */
-  boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) {
     if (maxAttempts <= 0) {
       return false;
     }
@@ -215,7 +202,9 @@ public class HTableMultiplexer {
       HTable.validatePut(put, maxKeyValueSize);
       // Allow mocking to get at the connection, but don't expose the connection to users.
       ClusterConnection conn = (ClusterConnection) getConnection();
-      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
+      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
+      // as necessary. We shouldn't have to do that here.
+      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
@@ -512,12 +501,16 @@ public class HTableMultiplexer {
         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
       }
 
+      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
+      // the region location cache when the Put original failed with some exception. If we keep
+      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
+      // that we expect it to.
       getExecutor().schedule(new Runnable() {
         @Override
         public void run() {
           boolean succ = false;
           try {
-            succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
+            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
           } finally {
             FlushWorker.this.getRetryInQueue().decrementAndGet();
             if (!succ) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8073c4a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
index ebf1499..1d6f5d6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.exceptions;
 
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -40,14 +41,15 @@ public final class ClientExceptionsUtil {
     if (cur == null) {
       return true;
     }
-    return !isSpecialException(cur) || (cur instanceof RegionMovedException);
+    return !isSpecialException(cur) || (cur instanceof RegionMovedException)
+        || cur instanceof NotServingRegionException;
   }
 
   public static boolean isSpecialException(Throwable cur) {
     return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
         || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
         || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
-        || cur instanceof CallQueueTooBigException);
+        || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8073c4a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
index 38ddeb9..7e68c21 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
@@ -16,34 +16,17 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
-import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -52,112 +35,19 @@ import static org.mockito.Mockito.when;
 @Category(SmallTests.class)
 public class TestHTableMultiplexerViaMocks {
 
-  private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
   private HTableMultiplexer mockMultiplexer;
   private ClusterConnection mockConnection;
-  private HRegionLocation mockRegionLocation;
-  private HRegionInfo mockRegionInfo;
-
-  private TableName tableName;
-  private Put put;
 
   @Before
   public void setupTest() {
     mockMultiplexer = mock(HTableMultiplexer.class);
     mockConnection = mock(ClusterConnection.class);
-    mockRegionLocation = mock(HRegionLocation.class);
-    mockRegionInfo = mock(HRegionInfo.class);
-
-    tableName = TableName.valueOf("my_table");
-    put = new Put(getBytes("row1"));
-    put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
-    put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
-    put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
 
     // Call the real put(TableName, Put, int) method
     when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
 
     // Return the mocked ClusterConnection
     when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
-
-    // Return the regionInfo from the region location
-    when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
-
-    // Make sure this RegionInfo points to our table
-    when(mockRegionInfo.getTable()).thenReturn(tableName);
-  }
-
-  @Test public void useCacheOnInitialPut() throws Exception {
-    mockMultiplexer.put(tableName, put, NUM_RETRIES);
-
-    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
-  }
-
-  @Test public void nonNullLocationQueuesPut() throws Exception {
-    final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
-
-    // Call the real method for _put(TableName, Put, int, boolean)
-    when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
-
-    // Return a region location
-    when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
-    when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
-
-    assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
-
-    assertEquals(1, queue.size());
-    final PutStatus ps = queue.take();
-    assertEquals(put, ps.put);
-    assertEquals(mockRegionInfo, ps.regionInfo);
-  }
-
-  @Test public void ignoreCacheOnRetriedPut() throws Exception {
-    FlushWorker mockFlushWorker = mock(FlushWorker.class);
-    ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
-    final AtomicInteger retryInQueue = new AtomicInteger(0);
-    final AtomicLong totalFailedPuts = new AtomicLong(0L);
-    final int maxRetryInQueue = 20;
-    final long delay = 100L;
-
-    final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
-
-    // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
-    when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
-    // Succeed on the re-submit without caching
-    when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
-
-    // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
-    when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
-    when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
-    when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
-    when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
-    when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
-    when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
-
-    // When a Runnable is scheduled, run that Runnable
-    when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
-        new Answer<Void>() {
-          @Override
-          public Void answer(InvocationOnMock invocation) throws Throwable {
-            // Before we run this, should have one retry in progress.
-            assertEquals(1L, retryInQueue.get());
-
-            Object[] args = invocation.getArguments();
-            assertEquals(3, args.length);
-            assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
-            Runnable runnable = (Runnable) args[0];
-            runnable.run();
-            return null;
-          }
-        });
-
-    // The put should be rescheduled
-    assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
-
-    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
-    assertEquals(0L, totalFailedPuts.get());
-    // Net result should be zero (added one before rerunning, subtracted one after running).
-    assertEquals(0L, retryInQueue.get());
   }
 
   @SuppressWarnings("deprecation")
@@ -183,11 +73,4 @@ public class TestHTableMultiplexerViaMocks {
     // We should not close it again
     verify(mockConnection, times(0)).close();
   }
-
-  /**
-   * @return UTF-8 byte representation for {@code str}
-   */
-  private static byte[] getBytes(String str) {
-    return str.getBytes(UTF_8);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8073c4a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
index 063e376..05c9caa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
@@ -24,16 +24,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @Category({ LargeTests.class, ClientTests.class })
@@ -115,4 +120,59 @@ public class TestHTableMultiplexerFlushCache {
       checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
     }
   }
+
+  @Test
+  public void testOnRegionMove() throws Exception {
+    // This test is doing near exactly the same thing that testOnRegionChange but avoiding the
+    // potential to get a ConnectionClosingException. By moving the region, we can be certain that
+    // the connection is still valid and that the implementation is correctly handling an invalid
+    // Region cache (and not just tearing down the entire connection).
+    TableName TABLE = TableName.valueOf("testOnRegionMove");
+    final int NUM_REGIONS = 10;
+    HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
+      Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+
+    HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
+      PER_REGIONSERVER_QUEUE_SIZE);
+
+    final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
+    Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
+    byte[] row = startEndRows.getFirst()[1];
+    assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
+
+    Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
+    assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
+
+    checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
+
+    final HRegionLocation loc = regionLocator.getRegionLocation(row);
+    final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
+    // The current server for the region we're writing to
+    final ServerName originalServer = loc.getServerName();
+    ServerName newServer = null;
+    // Find a new server to move that region to
+    for (int i = 0; i < SLAVES; i++) {
+      HRegionServer rs = hbaseCluster.getRegionServer(0);
+      if (!rs.getServerName().equals(originalServer.getServerName())) {
+        newServer = rs.getServerName();
+        break;
+      }
+    }
+    assertNotNull("Did not find a new RegionServer to use", newServer);
+
+    // Move the region
+    LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
+        +  " to " + newServer);
+    TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
+        Bytes.toBytes(newServer.getServerName()));
+
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
+
+    // Send a new Put
+    put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
+    assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
+
+    // We should see the update make it to the new server eventually
+    checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
+  }
 }