You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/02/12 05:48:24 UTC
hbase git commit: HBASE-15221 HTableMultiplexer improvements (stale
region locations and resource leaks) (Josh Elser)
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 93c0764c5 -> 360a5782c
HBASE-15221 HTableMultiplexer improvements (stale region locations and resource leaks) (Josh Elser)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/360a5782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/360a5782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/360a5782
Branch: refs/heads/branch-1.1
Commit: 360a5782c19bd61df9e6b92cc2a72ff0260a082d
Parents: 93c0764
Author: tedyu <yu...@gmail.com>
Authored: Thu Feb 11 20:48:15 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Thu Feb 11 20:48:15 2016 -0800
----------------------------------------------------------------------
hbase-client/pom.xml | 5 +
.../hadoop/hbase/client/HTableMultiplexer.java | 111 +++++++++--
.../client/TestHTableMultiplexerViaMocks.java | 194 +++++++++++++++++++
3 files changed, 293 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 6315496..bf36e66 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -183,6 +183,11 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 7d61a0b..1ba2b77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -19,6 +19,9 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
@@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
* Each put will be sharded into different buffer queues based on its destination region server.
@@ -97,7 +98,18 @@ public class HTableMultiplexer {
*/
public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
throws IOException {
- this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
+ this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
+ }
+
+ /**
+ * @param conn The HBase connection.
+ * @param conf The HBase configuration
+ * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
+ * each region server before dropping the request.
+ */
+ public HTableMultiplexer(Connection conn, Configuration conf,
+ int perRegionServerBufferQueueSize) {
+ this.conn = (ClusterConnection) conn;
this.pool = HTable.getDefaultExecutor(conf);
this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -116,6 +128,18 @@ public class HTableMultiplexer {
}
/**
+ * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
+ * been closed.
+ * @throws IOException If there is an error closing the connection.
+ */
+ @SuppressWarnings("deprecation")
+ public synchronized void close() throws IOException {
+ if (!getConnection().isClosed()) {
+ getConnection().close();
+ }
+ }
+
+ /**
* The put request will be buffered by its corresponding buffer queue. Return false if the queue
* is already full.
* @param tableName
@@ -172,13 +196,28 @@ public class HTableMultiplexer {
* @throws IOException
*/
public boolean put(final TableName tableName, final Put put, int retry) {
+ return _put(tableName, put, retry, false);
+ }
+
+ /**
+ * Internal "put" which exposes a boolean flag to control whether or not the region location
+ * cache should be reloaded when trying to queue the {@link Put}.
+ * @param tableName Destination table for the Put
+ * @param put The Put to send
+ * @param retry Number of attempts to retry the {@code put}
+ * @param reloadCache Should the region location cache be reloaded
+ * @return true if the request was accepted in the queue, otherwise false
+ */
+ boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache) {
if (retry <= 0) {
return false;
}
try {
HTable.validatePut(put, maxKeyValueSize);
- HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
+ // Allow mocking to get at the connection, but don't expose the connection to users.
+ ClusterConnection conn = (ClusterConnection) getConnection();
+ HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
if (loc != null) {
// Add the put pair into its corresponding queue.
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
@@ -217,7 +256,8 @@ public class HTableMultiplexer {
return new HTableMultiplexerStatus(serverToFlushWorkerMap);
}
- private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
+ @VisibleForTesting
+ LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
FlushWorker worker = serverToFlushWorkerMap.get(addr);
if (worker == null) {
synchronized (this.serverToFlushWorkerMap) {
@@ -234,6 +274,11 @@ public class HTableMultiplexer {
return worker.getQueue();
}
+ @VisibleForTesting
+ ClusterConnection getConnection() {
+ return this.conn;
+ }
+
/**
* HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
* report the number of buffered requests and the number of the failed (dropped) requests
@@ -342,7 +387,8 @@ public class HTableMultiplexer {
}
}
- private static class PutStatus {
+ @VisibleForTesting
+ static class PutStatus {
public final HRegionInfo regionInfo;
public final Put put;
public final int retryCount;
@@ -394,7 +440,8 @@ public class HTableMultiplexer {
}
}
- private static class FlushWorker implements Runnable {
+ @VisibleForTesting
+ static class FlushWorker implements Runnable {
private final HRegionLocation addr;
private final LinkedBlockingQueue<PutStatus> queue;
private final HTableMultiplexer multiplexer;
@@ -442,7 +489,7 @@ public class HTableMultiplexer {
return this.maxLatency.getAndSet(0);
}
- private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
+ boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
// Decrease the retry count
final int retryCount = ps.retryCount - 1;
@@ -451,10 +498,10 @@ public class HTableMultiplexer {
return false;
}
- int cnt = retryInQueue.incrementAndGet();
- if (cnt > maxRetryInQueue) {
+ int cnt = getRetryInQueue().incrementAndGet();
+ if (cnt > getMaxRetryInQueue()) {
// Too many Puts in queue for resubmit, give up this
- retryInQueue.decrementAndGet();
+ getRetryInQueue().decrementAndGet();
return false;
}
@@ -462,22 +509,21 @@ public class HTableMultiplexer {
// The currentPut is failed. So get the table name for the currentPut.
final TableName tableName = ps.regionInfo.getTable();
- long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
- multiplexer.retryNum - retryCount - 1);
+ long delayMs = getNextDelay(retryCount);
if (LOG.isDebugEnabled()) {
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
}
- executor.schedule(new Runnable() {
+ getExecutor().schedule(new Runnable() {
@Override
public void run() {
boolean succ = false;
try {
- succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
+ succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
} finally {
- FlushWorker.this.retryInQueue.decrementAndGet();
+ FlushWorker.this.getRetryInQueue().decrementAndGet();
if (!succ) {
- FlushWorker.this.totalFailedPutCount.incrementAndGet();
+ FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
}
}
}
@@ -485,6 +531,37 @@ public class HTableMultiplexer {
return true;
}
+ @VisibleForTesting
+ long getNextDelay(int retryCount) {
+ return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
+ multiplexer.retryNum - retryCount - 1);
+ }
+
+ @VisibleForTesting
+ AtomicInteger getRetryInQueue() {
+ return this.retryInQueue;
+ }
+
+ @VisibleForTesting
+ int getMaxRetryInQueue() {
+ return this.maxRetryInQueue;
+ }
+
+ @VisibleForTesting
+ AtomicLong getTotalFailedPutCount() {
+ return this.totalFailedPutCount;
+ }
+
+ @VisibleForTesting
+ HTableMultiplexer getMultiplexer() {
+ return this.multiplexer;
+ }
+
+ @VisibleForTesting
+ ScheduledExecutorService getExecutor() {
+ return this.executor;
+ }
+
@Override
public void run() {
int failedCount = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/360a5782/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
new file mode 100644
index 0000000..8e0b9a7
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Category(SmallTests.class)
+public class TestHTableMultiplexerViaMocks {
+
+ private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+ private HTableMultiplexer mockMultiplexer;
+ private ClusterConnection mockConnection;
+ private HRegionLocation mockRegionLocation;
+ private HRegionInfo mockRegionInfo;
+
+ private TableName tableName;
+ private Put put;
+
+ @Before
+ public void setupTest() {
+ mockMultiplexer = mock(HTableMultiplexer.class);
+ mockConnection = mock(ClusterConnection.class);
+ mockRegionLocation = mock(HRegionLocation.class);
+ mockRegionInfo = mock(HRegionInfo.class);
+
+ tableName = TableName.valueOf("my_table");
+ put = new Put(getBytes("row1"));
+ put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
+ put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
+ put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
+
+ // Call the real put(TableName, Put, int) method
+ when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
+
+ // Return the mocked ClusterConnection
+ when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
+
+ // Return the regionInfo from the region location
+ when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
+
+ // Make sure this RegionInfo points to our table
+ when(mockRegionInfo.getTable()).thenReturn(tableName);
+ }
+
+ @Test public void useCacheOnInitialPut() throws Exception {
+ mockMultiplexer.put(tableName, put, NUM_RETRIES);
+
+ verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
+ }
+
+ @Test public void nonNullLocationQueuesPut() throws Exception {
+ final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
+
+ // Call the real method for _put(TableName, Put, int, boolean)
+ when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
+
+ // Return a region location
+ when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
+ when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
+
+ assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
+
+ assertEquals(1, queue.size());
+ final PutStatus ps = queue.take();
+ assertEquals(put, ps.put);
+ assertEquals(mockRegionInfo, ps.regionInfo);
+ }
+
+ @Test public void ignoreCacheOnRetriedPut() throws Exception {
+ FlushWorker mockFlushWorker = mock(FlushWorker.class);
+ ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
+ final AtomicInteger retryInQueue = new AtomicInteger(0);
+ final AtomicLong totalFailedPuts = new AtomicLong(0L);
+ final int maxRetryInQueue = 20;
+ final long delay = 100L;
+
+ final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
+
+ // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
+ when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
+ // Succeed on the re-submit without caching
+ when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
+
+ // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
+ when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
+ when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
+ when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
+ when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
+ when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
+ when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
+
+ // When a Runnable is scheduled, run that Runnable
+ when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // Before we run this, should have one retry in progress.
+ assertEquals(1L, retryInQueue.get());
+
+ Object[] args = invocation.getArguments();
+ assertEquals(3, args.length);
+ assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
+ Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return null;
+ }
+ });
+
+ // The put should be rescheduled
+ assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
+
+ verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
+ assertEquals(0L, totalFailedPuts.get());
+ // Net result should be zero (added one before rerunning, subtracted one after running).
+ assertEquals(0L, retryInQueue.get());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test public void testConnectionClosing() throws IOException {
+ doCallRealMethod().when(mockMultiplexer).close();
+ // If the connection is not closed
+ when(mockConnection.isClosed()).thenReturn(false);
+
+ mockMultiplexer.close();
+
+ // We should close it
+ verify(mockConnection).close();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test public void testClosingAlreadyClosedConnection() throws IOException {
+ doCallRealMethod().when(mockMultiplexer).close();
+ // If the connection is already closed
+ when(mockConnection.isClosed()).thenReturn(true);
+
+ mockMultiplexer.close();
+
+ // We should not close it again
+ verify(mockConnection, times(0)).close();
+ }
+
+ /**
+ * @return UTF-8 byte representation for {@code str}
+ */
+ private static byte[] getBytes(String str) {
+ return str.getBytes(UTF_8);
+ }
+}
+