You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/02/12 05:48:24 UTC

hbase git commit: HBASE-15221 HTableMultiplexer improvements (stale region locations and resource leaks) (Josh Elser)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 93c0764c5 -> 360a5782c


HBASE-15221 HTableMultiplexer improvements (stale region locations and resource leaks) (Josh Elser)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/360a5782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/360a5782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/360a5782

Branch: refs/heads/branch-1.1
Commit: 360a5782c19bd61df9e6b92cc2a72ff0260a082d
Parents: 93c0764
Author: tedyu <yu...@gmail.com>
Authored: Thu Feb 11 20:48:15 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Thu Feb 11 20:48:15 2016 -0800

----------------------------------------------------------------------
 hbase-client/pom.xml                            |   5 +
 .../hadoop/hbase/client/HTableMultiplexer.java  | 111 +++++++++--
 .../client/TestHTableMultiplexerViaMocks.java   | 194 +++++++++++++++++++
 3 files changed, 293 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 6315496..bf36e66 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -183,6 +183,11 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 7d61a0b..1ba2b77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
  * Each put will be sharded into different buffer queues based on its destination region server.
@@ -97,7 +98,18 @@ public class HTableMultiplexer {
    */
   public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
       throws IOException {
-    this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
+    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
+  }
+
+  /**
+   * @param conn The HBase connection.
+   * @param conf The HBase configuration
+   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
+   *          each region server before dropping the request.
+   */
+  public HTableMultiplexer(Connection conn, Configuration conf,
+      int perRegionServerBufferQueueSize) {
+    this.conn = (ClusterConnection) conn;
     this.pool = HTable.getDefaultExecutor(conf);
     this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -116,6 +128,18 @@ public class HTableMultiplexer {
   }
 
   /**
+   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
+   * been closed.
+   * @throws IOException If there is an error closing the connection.
+   */
+  @SuppressWarnings("deprecation")
+  public synchronized void close() throws IOException {
+    if (!getConnection().isClosed()) {
+      getConnection().close();
+    }
+  }
+
+  /**
    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
    * is already full.
    * @param tableName
@@ -172,13 +196,28 @@ public class HTableMultiplexer {
    * @throws IOException
    */
   public boolean put(final TableName tableName, final Put put, int retry) {
+    return _put(tableName, put, retry, false);
+  }
+
+  /**
+   * Internal "put" which exposes a boolean flag to control whether or not the region location
+   * cache should be reloaded when trying to queue the {@link Put}.
+   * @param tableName Destination table for the Put
+   * @param put The Put to send
+   * @param retry Number of attempts to retry the {@code put}
+   * @param reloadCache Should the region location cache be reloaded
+   * @return true if the request was accepted in the queue, otherwise false
+   */
+  boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache) {
     if (retry <= 0) {
       return false;
     }
 
     try {
       HTable.validatePut(put, maxKeyValueSize);
-      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
+      // Allow mocking to get at the connection, but don't expose the connection to users.
+      ClusterConnection conn = (ClusterConnection) getConnection();
+      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
@@ -217,7 +256,8 @@ public class HTableMultiplexer {
     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
   }
 
-  private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
+  @VisibleForTesting
+  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
     FlushWorker worker = serverToFlushWorkerMap.get(addr);
     if (worker == null) {
       synchronized (this.serverToFlushWorkerMap) {
@@ -234,6 +274,11 @@ public class HTableMultiplexer {
     return worker.getQueue();
   }
 
+  @VisibleForTesting
+  ClusterConnection getConnection() {
+    return this.conn;
+  }
+
   /**
    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
    * report the number of buffered requests and the number of the failed (dropped) requests
@@ -342,7 +387,8 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class PutStatus {
+  @VisibleForTesting
+  static class PutStatus {
     public final HRegionInfo regionInfo;
     public final Put put;
     public final int retryCount;
@@ -394,7 +440,8 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class FlushWorker implements Runnable {
+  @VisibleForTesting
+  static class FlushWorker implements Runnable {
     private final HRegionLocation addr;
     private final LinkedBlockingQueue<PutStatus> queue;
     private final HTableMultiplexer multiplexer;
@@ -442,7 +489,7 @@ public class HTableMultiplexer {
       return this.maxLatency.getAndSet(0);
     }
 
-    private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
+    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
       // Decrease the retry count
       final int retryCount = ps.retryCount - 1;
 
@@ -451,10 +498,10 @@ public class HTableMultiplexer {
         return false;
       }
 
-      int cnt = retryInQueue.incrementAndGet();
-      if (cnt > maxRetryInQueue) {
+      int cnt = getRetryInQueue().incrementAndGet();
+      if (cnt > getMaxRetryInQueue()) {
         // Too many Puts in queue for resubmit, give up this
-        retryInQueue.decrementAndGet();
+        getRetryInQueue().decrementAndGet();
         return false;
       }
 
@@ -462,22 +509,21 @@ public class HTableMultiplexer {
       // The currentPut is failed. So get the table name for the currentPut.
       final TableName tableName = ps.regionInfo.getTable();
 
-      long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
-        multiplexer.retryNum - retryCount - 1);
+      long delayMs = getNextDelay(retryCount);
       if (LOG.isDebugEnabled()) {
         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
       }
 
-      executor.schedule(new Runnable() {
+      getExecutor().schedule(new Runnable() {
         @Override
         public void run() {
           boolean succ = false;
           try {
-            succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
+            succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
           } finally {
-            FlushWorker.this.retryInQueue.decrementAndGet();
+            FlushWorker.this.getRetryInQueue().decrementAndGet();
             if (!succ) {
-              FlushWorker.this.totalFailedPutCount.incrementAndGet();
+              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
             }
           }
         }
@@ -485,6 +531,37 @@ public class HTableMultiplexer {
       return true;
     }
 
+    @VisibleForTesting
+    long getNextDelay(int retryCount) {
+      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
+          multiplexer.retryNum - retryCount - 1);
+    }
+
+    @VisibleForTesting
+    AtomicInteger getRetryInQueue() {
+      return this.retryInQueue;
+    }
+
+    @VisibleForTesting
+    int getMaxRetryInQueue() {
+      return this.maxRetryInQueue;
+    }
+
+    @VisibleForTesting
+    AtomicLong getTotalFailedPutCount() {
+      return this.totalFailedPutCount;
+    }
+
+    @VisibleForTesting
+    HTableMultiplexer getMultiplexer() {
+      return this.multiplexer;
+    }
+
+    @VisibleForTesting
+    ScheduledExecutorService getExecutor() {
+      return this.executor;
+    }
+
     @Override
     public void run() {
       int failedCount = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
new file mode 100644
index 0000000..8e0b9a7
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Category(SmallTests.class)
+public class TestHTableMultiplexerViaMocks {
+
+  private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+  private HTableMultiplexer mockMultiplexer;
+  private ClusterConnection mockConnection;
+  private HRegionLocation mockRegionLocation;
+  private HRegionInfo mockRegionInfo;
+
+  private TableName tableName;
+  private Put put;
+
+  @Before
+  public void setupTest() {
+    mockMultiplexer = mock(HTableMultiplexer.class);
+    mockConnection = mock(ClusterConnection.class);
+    mockRegionLocation = mock(HRegionLocation.class);
+    mockRegionInfo = mock(HRegionInfo.class);
+
+    tableName = TableName.valueOf("my_table");
+    put = new Put(getBytes("row1"));
+    put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
+    put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
+    put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
+
+    // Call the real put(TableName, Put, int) method
+    when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
+
+    // Return the mocked ClusterConnection
+    when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
+
+    // Return the regionInfo from the region location
+    when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
+
+    // Make sure this RegionInfo points to our table
+    when(mockRegionInfo.getTable()).thenReturn(tableName);
+  }
+
+  @Test public void useCacheOnInitialPut() throws Exception {
+    mockMultiplexer.put(tableName, put, NUM_RETRIES);
+
+    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
+  }
+
+  @Test public void nonNullLocationQueuesPut() throws Exception {
+    final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
+
+    // Call the real method for _put(TableName, Put, int, boolean)
+    when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
+
+    // Return a region location
+    when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
+    when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
+
+    assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
+
+    assertEquals(1, queue.size());
+    final PutStatus ps = queue.take();
+    assertEquals(put, ps.put);
+    assertEquals(mockRegionInfo, ps.regionInfo);
+  }
+
+  @Test public void ignoreCacheOnRetriedPut() throws Exception {
+    FlushWorker mockFlushWorker = mock(FlushWorker.class);
+    ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
+    final AtomicInteger retryInQueue = new AtomicInteger(0);
+    final AtomicLong totalFailedPuts = new AtomicLong(0L);
+    final int maxRetryInQueue = 20;
+    final long delay = 100L;
+
+    final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
+
+    // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
+    when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
+    // Succeed on the re-submit without caching
+    when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
+
+    // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
+    when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
+    when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
+    when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
+    when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
+    when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
+    when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
+
+    // When a Runnable is scheduled, run that Runnable
+    when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // Before we run this, should have one retry in progress.
+            assertEquals(1L, retryInQueue.get());
+
+            Object[] args = invocation.getArguments();
+            assertEquals(3, args.length);
+            assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
+            Runnable runnable = (Runnable) args[0];
+            runnable.run();
+            return null;
+          }
+        });
+
+    // The put should be rescheduled
+    assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
+
+    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
+    assertEquals(0L, totalFailedPuts.get());
+    // Net result should be zero (added one before rerunning, subtracted one after running).
+    assertEquals(0L, retryInQueue.get());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test public void testConnectionClosing() throws IOException {
+    doCallRealMethod().when(mockMultiplexer).close();
+    // If the connection is not closed
+    when(mockConnection.isClosed()).thenReturn(false);
+
+    mockMultiplexer.close();
+
+    // We should close it
+    verify(mockConnection).close();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test public void testClosingAlreadyClosedConnection() throws IOException {
+    doCallRealMethod().when(mockMultiplexer).close();
+    // If the connection is already closed
+    when(mockConnection.isClosed()).thenReturn(true);
+
+    mockMultiplexer.close();
+
+    // We should not close it again
+    verify(mockConnection, times(0)).close();
+  }
+
+  /**
+   * @return UTF-8 byte representation for {@code str}
+   */
+  private static byte[] getBytes(String str) {
+    return str.getBytes(UTF_8);
+  }
+}
+