You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/03/14 10:29:13 UTC
hbase git commit: HBASE-19389 Limit concurrency of put with dense
(hundreds) columns to prevent write handler exhausted
Repository: hbase
Updated Branches:
refs/heads/master 31978c31b -> 98ac4f12b
HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted
Signed-off-by: Yu Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/98ac4f12
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/98ac4f12
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/98ac4f12
Branch: refs/heads/master
Commit: 98ac4f12b5ccec708fc03ddfb96935c8cd7304e1
Parents: 31978c3
Author: Yu Li <li...@apache.org>
Authored: Wed Mar 14 18:18:50 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Mar 14 18:25:17 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 1 +
.../hadoop/hbase/regionserver/HRegion.java | 86 +++++++-
.../hadoop/hbase/regionserver/HStore.java | 39 +++-
.../hbase/regionserver/RSRpcServices.java | 6 +
.../apache/hadoop/hbase/regionserver/Store.java | 2 +
.../throttle/StoreHotnessProtector.java | 196 +++++++++++++++++++
.../apache/hadoop/hbase/io/TestHeapSize.java | 9 +
.../throttle/TestStoreHotnessProtector.java | 130 ++++++++++++
8 files changed, 458 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 0039a56..f74b5e0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -89,6 +89,7 @@ public final class HConstants {
NOT_RUN,
SUCCESS,
BAD_FAMILY,
+ STORE_TOO_BUSY,
SANITY_CHECK_FAILURE,
FAILURE
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f071baf..0a94846 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
@@ -674,6 +675,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
+ private final StoreHotnessProtector storeHotnessProtector;
+
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@@ -794,6 +797,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
DEFAULT_DURABILITY : htd.getDurability();
+
+ this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
+
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@@ -806,8 +812,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.metricsRegion = null;
}
if (LOG.isDebugEnabled()) {
- // Write out region name as string and its encoded name.
- LOG.debug("Instantiated " + this);
+ // Write out region name, its encoded name and storeHotnessProtector as string.
+ LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString());
}
configurationManager = Optional.empty();
@@ -3180,9 +3186,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isOperationPending(lastIndexExclusive)) {
continue;
}
+
+ // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting
+ // RS handlers, covering both MutationBatchOperation and ReplayBatchOperation
+ // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't
+ // pass the isOperationPending check
+ Map<byte[], List<Cell>> curFamilyCellMap =
+ getMutation(lastIndexExclusive).getFamilyCellMap();
+ try {
+ // start the protector before acquiring row lock considering performance, and will finish
+ // it when encountering exception
+ region.storeHotnessProtector.start(curFamilyCellMap);
+ } catch (RegionTooBusyException rtbe) {
+ region.storeHotnessProtector.finish(curFamilyCellMap);
+ if (isAtomic()) {
+ throw rtbe;
+ }
+ retCodeDetails[lastIndexExclusive] =
+ new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage());
+ continue;
+ }
+
Mutation mutation = getMutation(lastIndexExclusive);
// If we haven't got any rows in our batch, we should block to get the next one.
RowLock rowLock = null;
+ boolean throwException = false;
try {
// if atomic then get exclusive lock, else shared lock
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
@@ -3190,16 +3218,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// NOTE: We will retry when other exceptions, but we should stop if we receive
// TimeoutIOException or InterruptedIOException as operation has timed out or
// interrupted respectively.
+ throwException = true;
throw e;
} catch (IOException ioe) {
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
if (isAtomic()) { // fail, atomic means all or none
+ throwException = true;
throw ioe;
}
+ } catch (Throwable throwable) {
+ throwException = true;
+ throw throwable;
+ } finally {
+ if (throwException) {
+ region.storeHotnessProtector.finish(curFamilyCellMap);
+ }
}
if (rowLock == null) {
// We failed to grab another lock
if (isAtomic()) {
+ region.storeHotnessProtector.finish(curFamilyCellMap);
throw new IOException("Can't apply all operations atomically!");
}
break; // Stop acquiring more rows for this batch
@@ -3285,7 +3323,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public void doPostOpCleanupForMiniBatch(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
- boolean success) throws IOException {}
+ boolean success) throws IOException {
+ doFinishHotnessProtector(miniBatchOp);
+ }
+
+ private void doFinishHotnessProtector(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ // check and return if the protector is not enabled
+ if (!region.storeHotnessProtector.isEnable()) {
+ return;
+ }
+ // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception.
+ // This case was handled.
+ if (miniBatchOp == null) {
+ return;
+ }
+
+ final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
+
+ for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
+ switch (retCodeDetails[i].getOperationStatusCode()) {
+ case SUCCESS:
+ case FAILURE:
+ region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
+ break;
+ default:
+ // do nothing
+ // We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
+ // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
+ break;
+ }
+ }
+ }
/**
* Atomically apply the given map of family->edits to the memstore.
@@ -3504,6 +3573,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
final WALEdit walEdit, boolean success) throws IOException {
+
+ super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
if (miniBatchOp != null) {
// synced so that the coprocessor contract is adhered to.
if (region.coprocessorHost != null) {
@@ -4097,6 +4168,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+ } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
+ throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
}
}
@@ -7900,7 +7973,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN);
@@ -7927,6 +8000,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
+ + StoreHotnessProtector.FIXED_SIZE
;
@Override
@@ -8391,7 +8465,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@Override
public void onConfigurationChange(Configuration conf) {
- // Do nothing for now.
+ this.storeHotnessProtector.update(conf);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 78e2bdb..d3a465e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1,4 +1,4 @@
-/**
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -181,6 +182,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
private final boolean verifyBulkLoads;
+ private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
+ private final int parallelPutCountPrintThreshold;
+
private ScanInfo scanInfo;
// All access must be synchronized.
@@ -295,7 +299,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
}
- LOG.debug("Memstore type={}", className);
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
@@ -334,6 +337,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
+ flushRetriesNumber);
}
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
+
+ int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
+ if (confPrintThreshold < 10) {
+ confPrintThreshold = 10;
+ }
+ this.parallelPutCountPrintThreshold = confPrintThreshold;
+ LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold="
+ + parallelPutCountPrintThreshold);
}
/**
@@ -697,9 +708,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
- this.memstore.add(cell, memstoreSizing);
+ if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
+ .getColumnFamilyName() + " too Busy!");
+ }
+ }
+ this.memstore.add(cell, memstoreSizing);
} finally {
lock.readLock().unlock();
+ currentParallelPutCount.decrementAndGet();
}
}
@@ -709,9 +727,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
+ if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
+ .getColumnFamilyName() + " too Busy!");
+ }
+ }
memstore.add(cells, memstoreSizing);
} finally {
lock.readLock().unlock();
+ currentParallelPutCount.decrementAndGet();
}
}
@@ -2368,8 +2393,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+ ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
+ + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -2685,4 +2710,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
}
+ public int getCurrentParallelPutCount() {
+ return currentParallelPutCount.get();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 803d3e8..c4fda68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -1039,6 +1040,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addResultOrException(getResultOrException(
ClientProtos.Result.getDefaultInstance(), index));
break;
+
+ case STORE_TOO_BUSY:
+ e = new RegionTooBusyException(codes[i].getExceptionMsg());
+ builder.addResultOrException(getResultOrException(e, index));
+ break;
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 042129f..6eb9f18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -280,4 +280,6 @@ public interface Store {
* @return true if the memstore may need some extra memory space
*/
boolean isSloppyMemStore();
+
+ int getCurrentParallelPutCount();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
new file mode 100644
index 0000000..a237a52
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+/**
+ * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
+ * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
+ * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
+ * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
+ * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
+ * <p>
+ * There are three key parameters:
+ * <p>
+ * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
+ * threshold, the HotProtector will work, 100 by default
+ * <p>
+ * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
+ * the same time.
+ * <p>
+ * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
+ * prepare writing puts to a Store at the same time.
+ * <p>
+ * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
+ * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
+ * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
+ * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
+ * <p>
+ * This protector is enabled by default and could be turned off by setting
+ * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
+ */
+@InterfaceAudience.Private
+public class StoreHotnessProtector {
+ private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class);
+ private volatile int parallelPutToStoreThreadLimit;
+
+ private volatile int parallelPreparePutToStoreThreadLimit;
+ public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
+ "hbase.region.store.parallel.put.limit";
+ public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
+ "hbase.region.store.parallel.prepare.put.multiplier";
+ private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
+ private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
+ public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
+ "hbase.region.store.parallel.put.limit.min.column.count";
+ private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
+ private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
+
+ private final Map<byte[], AtomicInteger> preparePutToStoreMap =
+ new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
+ private final Region region;
+
+ public StoreHotnessProtector(Region region, Configuration conf) {
+ init(conf);
+ this.region = region;
+ }
+
+ public void init(Configuration conf) {
+ this.parallelPutToStoreThreadLimit =
+ conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
+ this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
+ DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
+ this.parallelPutToStoreThreadLimitCheckMinColumnCount =
+ conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
+ DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
+
+ }
+
+ public void update(Configuration conf) {
+ init(conf);
+ preparePutToStoreMap.clear();
+ LOG.debug("update config: " + toString());
+ }
+
+ public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
+ if (!isEnable()) {
+ return;
+ }
+
+ String tooBusyStore = null;
+
+ for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
+ Store store = this.region.getStore(e.getKey());
+ if (store == null || e.getValue() == null) {
+ continue;
+ }
+
+ if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
+
+ //we need to try to add #preparePutCount at first because preparePutToStoreMap will be
+ //cleared when changing the configuration.
+ preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
+ AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
+ if (preparePutCounter == null) {
+ preparePutCounter = new AtomicInteger();
+ preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
+ }
+ int preparePutCount = preparePutCounter.incrementAndGet();
+ if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
+ || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
+ tooBusyStore = (tooBusyStore == null ?
+ store.getColumnFamilyName() :
+ tooBusyStore + "," + store.getColumnFamilyName());
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
+ + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
+ }
+ }
+ }
+
+ if (tooBusyStore != null) {
+ String msg =
+ "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
+ + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(msg);
+ }
+ throw new RegionTooBusyException(msg);
+ }
+ }
+
+ public void finish(Map<byte[], List<Cell>> familyMaps) {
+ if (!isEnable()) {
+ return;
+ }
+
+ for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
+ Store store = this.region.getStore(e.getKey());
+ if (store == null || e.getValue() == null) {
+ continue;
+ }
+ if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
+ AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
+ // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
+ // into a negative value. It will be not accuracy in a short time, it's a trade-off for
+ // performance.
+ if (counter != null && counter.decrementAndGet() < 0) {
+ counter.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ public String toString() {
+ return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
+ + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
+ + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
+ + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
+ "enable" :
+ "disable");
+ }
+
+ public boolean isEnable() {
+ // feature is enabled when parallelPutToStoreThreadLimit > 0
+ return this.parallelPutToStoreThreadLimit > 0;
+ }
+
+ @VisibleForTesting
+ Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
+ return preparePutToStoreMap;
+ }
+
+ public static final long FIXED_SIZE =
+ ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index f979397..2d454e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
import org.apache.hadoop.hbase.regionserver.Segment;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -476,6 +477,14 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
+ cl = StoreHotnessProtector.class;
+ actual = StoreHotnessProtector.FIXED_SIZE;
+ expected = ClassSize.estimateBase(cl, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ assertEquals(expected, actual);
+ }
+
// Block cache key overhead. Only tests fixed overhead as estimating heap
// size of strings is hard.
cl = BlockCacheKey.class;
http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
new file mode 100644
index 0000000..6d41934
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category(SmallTests.class)
+public class TestStoreHotnessProtector {
+
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
+
+ @Test(timeout = 60000)
+ public void testPreparePutCounter() throws Exception {
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ Configuration conf = new Configuration();
+ conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
+ conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
+ conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
+ Region mockRegion = mock(Region.class);
+ StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf);
+
+ Store mockStore1 = mock(Store.class);
+ RegionInfo mockRegionInfo = mock(RegionInfo.class);
+ byte[] family = "testF1".getBytes();
+
+ when(mockRegion.getStore(family)).thenReturn(mockStore1);
+ when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
+ when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
+
+ when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
+ when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
+
+ final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
+ familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class)));
+
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ // PreparePutCounter not access limit
+
+ int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
+ .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
+ CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ executorService.execute(() -> {
+ try {
+ storeHotnessProtector.start(familyMaps);
+ } catch (RegionTooBusyException e) {
+ e.printStackTrace();
+ exception.set(e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+
+ countDownLatch.await(60, TimeUnit.SECONDS);
+ //no exception
+ Assert.assertEquals(exception.get(), null);
+ Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
+ Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+ threadCount);
+
+ // access limit
+
+ try {
+ storeHotnessProtector.start(familyMaps);
+ } catch (RegionTooBusyException e) {
+ e.printStackTrace();
+ exception.set(e);
+ }
+
+ Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class);
+
+ Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
+ // when access limit, counter will not changed.
+ Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+ threadCount + 1);
+
+ storeHotnessProtector.finish(familyMaps);
+ Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+ threadCount);
+ }
+
+}
\ No newline at end of file