You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/16 20:18:35 UTC
svn commit: r1595280 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
test/java/org/apache/hadoop/hbase/client/
test/java/org/apache/hadoop/hbase/mock/
Author: liyin
Date: Fri May 16 18:18:35 2014
New Revision: 1595280
URL: http://svn.apache.org/r1595280
Log:
[HBASE-11187] Limit the number of client threads per regionserver
Author: elliott
Summary:
When there are lots of HTables created and a region server becomes slow
to respond this will create lots of threads in the HConnection(TableServers).
This patch adds a config that will limit the number of thread per regionserver
that the HBase client can spawn. Above that number the requests will fail fast.
Test Plan: verynicetests
Reviewers: aaiyer, rshroff
Reviewed By: rshroff
Subscribers: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1331985
Tasks: 4330194
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Fri May 16 18:18:35 2014
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -286,7 +287,7 @@ public interface HConnection extends Clo
* Or, operations were not successfully completed.
*/
public void processBatchedGets(List<Get> actions, StringBytes tableName,
- ExecutorService pool, Result[] results, HBaseRPCOptions options)
+ ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
throws IOException, InterruptedException;
/**
@@ -303,7 +304,7 @@ public interface HConnection extends Clo
* META. Or, operations were not successfully completed.
*/
public void processBatchedMutations(List<Mutation> actions,
- StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+ StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
HBaseRPCOptions options) throws IOException, InterruptedException;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri May 16 18:18:35 2014
@@ -38,6 +38,8 @@ import java.util.concurrent.SynchronousQ
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -96,11 +98,12 @@ public class HTable implements HTableInt
// Share this multiaction thread pool across all the HTable instance;
// The total number of threads will be bounded #HTable * #RegionServer.
- static ExecutorService multiActionThreadPool =
- new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+ static ExecutorService multiActionThreadPool = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("htable-thread-"));
+
+ static ListeningExecutorService listeningMultiActionPool = MoreExecutors.listeningDecorator(multiActionThreadPool);
static {
((ThreadPoolExecutor)multiActionThreadPool).allowCoreThreadTimeOut(true);
}
@@ -798,7 +801,7 @@ public class HTable implements HTableInt
throws IOException {
Result[] results = new Result[actions.size()];
try {
- this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName, multiActionThreadPool,
+ this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName, listeningMultiActionPool,
results, this.options);
} catch (Exception e) {
e.printStackTrace();
@@ -815,7 +818,7 @@ public class HTable implements HTableInt
throws IOException {
try {
this.getConnectionAndResetOperationContext().processBatchedMutations(actions,
- tableName, multiActionThreadPool, null, this.options);
+ tableName, listeningMultiActionPool, null, this.options);
} catch (Exception e) {
throw new IOException(e);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java Fri May 16 18:18:35 2014
@@ -45,12 +45,18 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -93,6 +99,8 @@ import org.apache.thrift.transport.TTran
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
/* Encapsulates finding the servers for an HBase instance */
public class TableServers implements ServerConnection {
static final Log LOG = LogFactory.getLog(TableServers.class);
@@ -100,6 +108,7 @@ public class TableServers implements Ser
private final int prefetchRegionLimit;
private final Object masterLock = new Object();
+ private final int maxOutstandRequestsPerServer;
private volatile boolean closed;
private volatile HMasterInterface master;
private volatile boolean masterChecked;
@@ -138,7 +147,9 @@ public class TableServers implements Ser
private long fastFailClearingTimeMilliSec;
private final boolean recordClientContext;
- private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
+ private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<>();
+
+ private final ConcurrentHashMap<HServerAddress, AtomicInteger> outstandingRequests = new ConcurrentHashMap<>();
@Override
public void resetOperationContext() {
@@ -285,6 +296,7 @@ public class TableServers implements Ser
this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
+ this.maxOutstandRequestsPerServer = conf.getInt("hbase.client.max.outstanding.requests.per.server", 50);
}
// Used by master and region servers during safe mode only
@@ -1895,23 +1907,30 @@ private HRegionLocation locateMetaInRoot
return actionsByServer;
}
- private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
+ private Map<HServerAddress, ListenableFuture<MultiResponse>> makeServerRequests(
Map<HServerAddress, MultiAction> actionsByServer,
- final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
+ final byte[] tableName, ListeningExecutorService pool, HBaseRPCOptions options) {
- Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress, Future<MultiResponse>>(
+ Map<HServerAddress, ListenableFuture<MultiResponse>> futures = new HashMap<>(
actionsByServer.size());
boolean singleServer = (actionsByServer.size() == 1);
for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
Callable<MultiResponse> callable = createMultiActionCallable(
e.getKey(), e.getValue(), tableName, options);
- Future<MultiResponse> task;
- if (singleServer) {
- task = new FutureTask<MultiResponse>(callable);
- ((FutureTask<MultiResponse>) task).run();
- } else {
- task = pool.submit(callable);
+ ListenableFuture<MultiResponse> task;
+
+ try {
+ validateAndIncrementNumOutstandingPerServer(e.getKey());
+ if (singleServer) {
+ task = ListenableFutureTask.create(callable);
+ ((FutureTask<MultiResponse>) task).run();
+ } else {
+ task = pool.submit(callable);
+ }
+ Futures.addCallback(task, new OutstandingRequestCallback(e.getKey()));
+ } catch (IOException e1) {
+ task = Futures.immediateFailedFuture(e1);
}
futures.put(e.getKey(), task);
}
@@ -1925,12 +1944,12 @@ private HRegionLocation locateMetaInRoot
private List<Mutation> collectResponsesForMutateFromAllRS(
StringBytes tableName,
Map<HServerAddress, MultiAction> actionsByServer,
- Map<HServerAddress, Future<MultiResponse>> futures,
+ Map<HServerAddress, ListenableFuture<MultiResponse>> futures,
Map<String, HRegionFailureInfo> failureInfo)
throws InterruptedException, IOException {
List<Mutation> newWorkingList = null;
- for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+ for (Entry<HServerAddress, ListenableFuture<MultiResponse>> responsePerServer : futures
.entrySet()) {
HServerAddress address = responsePerServer.getKey();
MultiAction request = actionsByServer.get(address);
@@ -1972,13 +1991,13 @@ private HRegionLocation locateMetaInRoot
*/
private List<Get> collectResponsesForGetFromAllRS(StringBytes tableName,
Map<HServerAddress, MultiAction> actionsByServer,
- Map<HServerAddress, Future<MultiResponse>> futures,
+ Map<HServerAddress, ListenableFuture<MultiResponse>> futures,
List<Get> orig_list, Result[] results,
Map<String, HRegionFailureInfo> failureInfo) throws IOException,
InterruptedException {
List<Get> newWorkingList = null;
- for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+ for (Entry<HServerAddress, ListenableFuture<MultiResponse>> responsePerServer : futures
.entrySet()) {
HServerAddress address = responsePerServer.getKey();
MultiAction request = actionsByServer.get(address);
@@ -2116,7 +2135,7 @@ private HRegionLocation locateMetaInRoot
@Override
public void processBatchedMutations(List<Mutation> orig_list,
- StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+ StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
HBaseRPCOptions options) throws IOException, InterruptedException {
// Keep track of the most recent servers for any given item for better
@@ -2146,7 +2165,7 @@ private HRegionLocation locateMetaInRoot
workingList, tableName, false);
// step 2: make the requests
- Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+ Map<HServerAddress, ListenableFuture<MultiResponse>> futures = makeServerRequests(
actionsByServer, tableName.getBytes(), pool, options);
// step 3: collect the failures and successes and prepare for retry
@@ -2165,7 +2184,7 @@ private HRegionLocation locateMetaInRoot
@Override
public void processBatchedGets(List<Get> orig_list, StringBytes tableName,
- ExecutorService pool, Result[] results, HBaseRPCOptions options)
+ ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
throws IOException, InterruptedException {
Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
@@ -2197,7 +2216,7 @@ private HRegionLocation locateMetaInRoot
workingList, tableName, true);
// step 2: make the requests
- Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+ Map<HServerAddress, ListenableFuture<MultiResponse>> futures = makeServerRequests(
actionsByServer, tableName.getBytes(), pool, options);
// step 3: collect the failures and successes and prepare for retry
@@ -2520,21 +2539,28 @@ private HRegionLocation locateMetaInRoot
Map<String, HRegionFailureInfo> failedRegionsInfo) throws IOException {
List<Put> failed = null;
- List<Future<MultiPutResponse>> futures = new ArrayList<Future<MultiPutResponse>>(
+ List<ListenableFuture<MultiPutResponse>> futures = new ArrayList<>(
multiPuts.size());
boolean singleServer = (multiPuts.size() == 1);
for (MultiPut put : multiPuts) {
Callable<MultiPutResponse> callable = createPutCallable(put.address,
put, options);
- Future<MultiPutResponse> task;
- if (singleServer) {
- FutureTask<MultiPutResponse> futureTask = new FutureTask<MultiPutResponse>(
- callable);
- task = futureTask;
- futureTask.run();
- } else {
- task = HTable.multiActionThreadPool.submit(callable);
+ ListenableFuture<MultiPutResponse> task;
+ try {
+ validateAndIncrementNumOutstandingPerServer(put.address);
+ if (singleServer) {
+ ListenableFutureTask<MultiPutResponse> futureTask = ListenableFutureTask.create(
+ callable);
+ task = futureTask;
+ futureTask.run();
+ } else {
+ task = HTable.listeningMultiActionPool.submit(callable);
+ }
+ Futures.addCallback(task, new OutstandingRequestCallback(put.address));
+ } catch (IOException e1) {
+ task = Futures.immediateFailedFuture(e1);
}
+
futures.add(task);
}
@@ -3329,8 +3355,49 @@ private HRegionLocation locateMetaInRoot
return regionFlushTimesMap;
}
+ void validateAndIncrementNumOutstandingPerServer(HServerAddress address)
+ throws TooManyOutstandingRequestsException {
+ AtomicInteger atomicOutstanding = outstandingRequests.computeIfAbsent(address, new Function<HServerAddress, AtomicInteger>() {
+ @Override public AtomicInteger apply(HServerAddress address) {
+ return new AtomicInteger(0);
+ }
+ });
+
+ int outstanding = atomicOutstanding.get();
+ if (outstanding > maxOutstandRequestsPerServer) {
+ throw new TooManyOutstandingRequestsException(address, outstanding);
+ }
+ atomicOutstanding.incrementAndGet();
+ }
+
+ void decrementNumOutstandingPerServer(HServerAddress address) {
+ AtomicInteger outstanding = outstandingRequests.computeIfAbsent(address, new Function<HServerAddress, AtomicInteger>() {
+ @Override public AtomicInteger apply(HServerAddress address) {
+ return new AtomicInteger(0);
+ }
+ });
+ outstanding.decrementAndGet();
+ }
+
@Override
public Configuration getConf() {
return this.conf;
}
+
+ private class OutstandingRequestCallback<V> implements FutureCallback<V> {
+
+ private final HServerAddress server;
+
+ public OutstandingRequestCallback(HServerAddress server) {
+ this.server = server;
+ }
+
+ @Override public void onSuccess(@Nullable V result) {
+ decrementNumOutstandingPerServer(server);
+ }
+
+ @Override public void onFailure(Throwable t) {
+ decrementNumOutstandingPerServer(server);
+ }
+ }
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java?rev=1595280&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java Fri May 16 18:18:35 2014
@@ -0,0 +1,37 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HServerAddress;
+
+
+public class TooManyOutstandingRequestsException extends ClientSideDoNotRetryException {
+ private final HServerAddress server;
+
+ public TooManyOutstandingRequestsException(HServerAddress server, int numCurrentlyOutstanding) {
+ super("Server " + server + " has " + numCurrentlyOutstanding + " outstanding requests. " +
+ "Failing fast to keep from creating more threads.");
+ this.server = server;
+ }
+
+ public HServerAddress getServer() {
+ return server;
+ }
+}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java?rev=1595280&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java Fri May 16 18:18:35 2014
@@ -0,0 +1,160 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(MediumTests.class)
+public class TestLoadShedding {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
+ private static final AtomicLong NUM_SHED = new AtomicLong(0);
+ private static final String TABLE_NAME = "testLoadShedding";
+ private static final String FAMILY_NAME_STR = "d";
+ private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+ public static final int MAX_WRITER_THREADS = 15;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt("hbase.client.max.outstanding.requests.per.server", 5);
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL.waitForTableConsistent();
+ HBaseAdmin hba = TEST_UTIL.getHBaseAdmin();
+ HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+ htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
+ hba.createTable(htd, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 10);
+ RUNNING.set(true);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ RUNNING.set(false);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testLoadSheddingWrites() {
+
+ // Start lots of threads so that there will be more than 2 contending per
+ // server
+ List<Thread> threads = new ArrayList<>(MAX_WRITER_THREADS);
+ for (int i = 0; i < MAX_WRITER_THREADS; i++) {
+ Thread t = new WriterThread(i);
+ t.setDaemon(true);
+ t.start();
+ threads.add(t);
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+
+ RUNNING.set(false);
+ assertTrue(NUM_SHED.get() > 0);
+
+ // Now wait for everything to stop
+ for (Thread t:threads) {
+ try {
+ t.join();
+ t.stop();
+ } catch (InterruptedException e) {
+
+ }
+ }
+
+ // Clean up
+ NUM_SHED.set(0);
+ RUNNING.set(true);
+
+ // Make sure that just a few threads don't contend
+ for (int x=0; x< 4; x++) {
+ Thread t = new WriterThread(x + MAX_WRITER_THREADS);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+
+ RUNNING.set(false);
+ assertEquals(0, NUM_SHED.get());
+
+
+ }
+
+ protected static Put generateRandomPut() {
+ Put p = new Put(Bytes.toBytes(RandomStringUtils.randomAlphabetic(30)));
+ p.add(FAMILY_NAME, Bytes.toBytes(RandomUtils.nextInt()), Bytes.toBytes(
+ RandomStringUtils.randomAlphanumeric(10)));
+ return p;
+ }
+
+ private static class WriterThread extends Thread {
+ public WriterThread(int i) {
+ super("client-writer-" + i);
+ }
+
+ public void run() {
+ try {
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
+ ht.setAutoFlush(false);
+
+ while(RUNNING.get()) {
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.put(generateRandomPut());
+ ht.flushCommits();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ NUM_SHED.incrementAndGet();
+ }
+ }
+ }
+}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java Fri May 16 18:18:35 2014
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -227,7 +229,7 @@ public class HConnectionMockImpl impleme
@Override
public void processBatchedGets(List<Get> actions, StringBytes tableName,
- ExecutorService pool, Result[] results, HBaseRPCOptions options)
+ ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
@@ -235,7 +237,7 @@ public class HConnectionMockImpl impleme
@Override
public void processBatchedMutations(List<Mutation> actions,
- StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+ StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
HBaseRPCOptions options) throws IOException, InterruptedException {
// TODO Auto-generated method stub