You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/03/14 10:29:13 UTC

hbase git commit: HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted

Repository: hbase
Updated Branches:
  refs/heads/master 31978c31b -> 98ac4f12b


HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted

Signed-off-by: Yu Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/98ac4f12
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/98ac4f12
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/98ac4f12

Branch: refs/heads/master
Commit: 98ac4f12b5ccec708fc03ddfb96935c8cd7304e1
Parents: 31978c3
Author: Yu Li <li...@apache.org>
Authored: Wed Mar 14 18:18:50 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Mar 14 18:25:17 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   1 +
 .../hadoop/hbase/regionserver/HRegion.java      |  86 +++++++-
 .../hadoop/hbase/regionserver/HStore.java       |  39 +++-
 .../hbase/regionserver/RSRpcServices.java       |   6 +
 .../apache/hadoop/hbase/regionserver/Store.java |   2 +
 .../throttle/StoreHotnessProtector.java         | 196 +++++++++++++++++++
 .../apache/hadoop/hbase/io/TestHeapSize.java    |   9 +
 .../throttle/TestStoreHotnessProtector.java     | 130 ++++++++++++
 8 files changed, 458 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 0039a56..f74b5e0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -89,6 +89,7 @@ public final class HConstants {
     NOT_RUN,
     SUCCESS,
     BAD_FAMILY,
+    STORE_TOO_BUSY,
     SANITY_CHECK_FAILURE,
     FAILURE
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f071baf..0a94846 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -674,6 +675,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
       Bytes.BYTES_COMPARATOR);
 
+  private final StoreHotnessProtector storeHotnessProtector;
+
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -794,6 +797,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
     this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
         DEFAULT_DURABILITY : htd.getDurability();
+
+    this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
+
     if (rsServices != null) {
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
@@ -806,8 +812,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.metricsRegion = null;
     }
     if (LOG.isDebugEnabled()) {
-      // Write out region name as string and its encoded name.
-      LOG.debug("Instantiated " + this);
+      // Write out region name, its encoded name and storeHotnessProtector as string.
+      LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString());
     }
 
     configurationManager = Optional.empty();
@@ -3180,9 +3186,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!isOperationPending(lastIndexExclusive)) {
           continue;
         }
+
+        // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting
+        // RS handlers, covering both MutationBatchOperation and ReplayBatchOperation
+        // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't
+        // pass the isOperationPending check
+        Map<byte[], List<Cell>> curFamilyCellMap =
+            getMutation(lastIndexExclusive).getFamilyCellMap();
+        try {
+          // start the protector before acquiring row lock considering performance, and will finish
+          // it when encountering exception
+          region.storeHotnessProtector.start(curFamilyCellMap);
+        } catch (RegionTooBusyException rtbe) {
+          region.storeHotnessProtector.finish(curFamilyCellMap);
+          if (isAtomic()) {
+            throw rtbe;
+          }
+          retCodeDetails[lastIndexExclusive] =
+              new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage());
+          continue;
+        }
+
         Mutation mutation = getMutation(lastIndexExclusive);
         // If we haven't got any rows in our batch, we should block to get the next one.
         RowLock rowLock = null;
+        boolean throwException = false;
         try {
           // if atomic then get exclusive lock, else shared lock
           rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
@@ -3190,16 +3218,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // NOTE: We will retry when other exceptions, but we should stop if we receive
           // TimeoutIOException or InterruptedIOException as operation has timed out or
           // interrupted respectively.
+          throwException = true;
           throw e;
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
           if (isAtomic()) { // fail, atomic means all or none
+            throwException = true;
             throw ioe;
           }
+        } catch (Throwable throwable) {
+          throwException = true;
+          throw throwable;
+        } finally {
+          if (throwException) {
+            region.storeHotnessProtector.finish(curFamilyCellMap);
+          }
         }
         if (rowLock == null) {
           // We failed to grab another lock
           if (isAtomic()) {
+            region.storeHotnessProtector.finish(curFamilyCellMap);
             throw new IOException("Can't apply all operations atomically!");
           }
           break; // Stop acquiring more rows for this batch
@@ -3285,7 +3323,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     public void doPostOpCleanupForMiniBatch(
         final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
-        boolean success) throws IOException {}
+        boolean success) throws IOException {
+      doFinishHotnessProtector(miniBatchOp);
+    }
+
+    private void doFinishHotnessProtector(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      // check and return if the protector is not enabled
+      if (!region.storeHotnessProtector.isEnable()) {
+        return;
+      }
+      // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception.
+      // This case was handled.
+      if (miniBatchOp == null) {
+        return;
+      }
+
+      final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
+
+      for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
+        switch (retCodeDetails[i].getOperationStatusCode()) {
+          case SUCCESS:
+          case FAILURE:
+            region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
+            break;
+          default:
+            // do nothing
+            // We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
+            // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
+            break;
+        }
+      }
+    }
 
     /**
      * Atomically apply the given map of family->edits to the memstore.
@@ -3504,6 +3573,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     @Override
     public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
         final WALEdit walEdit, boolean success) throws IOException {
+
+      super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
       if (miniBatchOp != null) {
         // synced so that the coprocessor contract is adhered to.
         if (region.coprocessorHost != null) {
@@ -4097,6 +4168,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+    } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
+      throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
     }
   }
 
@@ -7900,7 +7973,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       3 * Bytes.SIZEOF_BOOLEAN);
 
@@ -7927,6 +8000,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
       + ClassSize.STORE_SERVICES // store services
+      + StoreHotnessProtector.FIXED_SIZE
       ;
 
   @Override
@@ -8391,7 +8465,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   @Override
   public void onConfigurationChange(Configuration conf) {
-    // Do nothing for now.
+    this.storeHotnessProtector.update(conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 78e2bdb..d3a465e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -181,6 +182,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
 
   private final boolean verifyBulkLoads;
 
+  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
+  private final int parallelPutCountPrintThreshold;
+
   private ScanInfo scanInfo;
 
   // All access must be synchronized.
@@ -295,7 +299,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
         this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
             this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
     }
-    LOG.debug("Memstore type={}", className);
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
@@ -334,6 +337,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
               + flushRetriesNumber);
     }
     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
+
+    int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
+    if (confPrintThreshold < 10) {
+      confPrintThreshold = 10;
+    }
+    this.parallelPutCountPrintThreshold = confPrintThreshold;
+    LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold="
+        + parallelPutCountPrintThreshold);
   }
 
   /**
@@ -697,9 +708,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
   public void add(final Cell cell, MemStoreSizing memstoreSizing) {
     lock.readLock().lock();
     try {
-       this.memstore.add(cell, memstoreSizing);
+      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
+              .getColumnFamilyName() + " too Busy!");
+        }
+      }
+      this.memstore.add(cell, memstoreSizing);
     } finally {
       lock.readLock().unlock();
+      currentParallelPutCount.decrementAndGet();
     }
   }
 
@@ -709,9 +727,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
   public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
     lock.readLock().lock();
     try {
+      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
+              .getColumnFamilyName() + " too Busy!");
+        }
+      }
       memstore.add(cells, memstoreSizing);
     } finally {
       lock.readLock().unlock();
+      currentParallelPutCount.decrementAndGet();
     }
   }
 
@@ -2368,8 +2393,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
-              + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+      ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
+              + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -2685,4 +2710,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     }
   }
 
+  public int getCurrentParallelPutCount() {
+    return currentParallelPutCount.get();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 803d3e8..c4fda68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -1039,6 +1040,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             builder.addResultOrException(getResultOrException(
               ClientProtos.Result.getDefaultInstance(), index));
             break;
+
+          case STORE_TOO_BUSY:
+            e = new RegionTooBusyException(codes[i].getExceptionMsg());
+            builder.addResultOrException(getResultOrException(e, index));
+            break;
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 042129f..6eb9f18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -280,4 +280,6 @@ public interface Store {
    * @return true if the memstore may need some extra memory space
    */
   boolean isSloppyMemStore();
+
+  int getCurrentParallelPutCount();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
new file mode 100644
index 0000000..a237a52
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+/**
+ * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
+ * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
+ * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
+ * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
+ * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
+ * <p>
+ * There are three key parameters:
+ * <p>
+ * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
+ * threshold, the HotProtector will work, 100 by default
+ * <p>
+ * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
+ * the same time.
+ * <p>
+ * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
+ * prepare writing puts to a Store at the same time.
+ * <p>
+ * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
+ * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
+ * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
+ * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
+ * <p>
+ * This protector is enabled by default and could be turned off by setting
+ * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
+ */
+@InterfaceAudience.Private
+public class StoreHotnessProtector {
+  private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class);
+  private volatile int parallelPutToStoreThreadLimit;
+
+  private volatile int parallelPreparePutToStoreThreadLimit;
+  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
+      "hbase.region.store.parallel.put.limit";
+  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
+      "hbase.region.store.parallel.prepare.put.multiplier";
+  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
+  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
+  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
+      "hbase.region.store.parallel.put.limit.min.column.count";
+  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
+  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
+
+  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
+      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
+  private final Region region;
+
+  public StoreHotnessProtector(Region region, Configuration conf) {
+    init(conf);
+    this.region = region;
+  }
+
+  public void init(Configuration conf) {
+    this.parallelPutToStoreThreadLimit =
+        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
+    this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
+        DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
+    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
+        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
+            DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
+
+  }
+
+  public void update(Configuration conf) {
+    init(conf);
+    preparePutToStoreMap.clear();
+    LOG.debug("update config: " + toString());
+  }
+
+  public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
+    if (!isEnable()) {
+      return;
+    }
+
+    String tooBusyStore = null;
+
+    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
+      Store store = this.region.getStore(e.getKey());
+      if (store == null || e.getValue() == null) {
+        continue;
+      }
+
+      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
+
+        //we need to try to add #preparePutCount at first because preparePutToStoreMap will be
+        //cleared when changing the configuration.
+        preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
+        AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
+        if (preparePutCounter == null) {
+          preparePutCounter = new AtomicInteger();
+          preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
+        }
+        int preparePutCount = preparePutCounter.incrementAndGet();
+        if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
+            || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
+          tooBusyStore = (tooBusyStore == null ?
+              store.getColumnFamilyName() :
+              tooBusyStore + "," + store.getColumnFamilyName());
+        }
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
+              + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
+        }
+      }
+    }
+
+    if (tooBusyStore != null) {
+      String msg =
+          "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
+              + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(msg);
+      }
+      throw new RegionTooBusyException(msg);
+    }
+  }
+
+  public void finish(Map<byte[], List<Cell>> familyMaps) {
+    if (!isEnable()) {
+      return;
+    }
+
+    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
+      Store store = this.region.getStore(e.getKey());
+      if (store == null || e.getValue() == null) {
+        continue;
+      }
+      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
+        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
+        // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
+        // into a negative value. It will be not accuracy in a short time, it's a trade-off for
+        // performance.
+        if (counter != null && counter.decrementAndGet() < 0) {
+          counter.incrementAndGet();
+        }
+      }
+    }
+  }
+
+  public String toString() {
+    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
+        + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
+        + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
+        + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
+        "enable" :
+        "disable");
+  }
+
+  public boolean isEnable() {
+    // feature is enabled when parallelPutToStoreThreadLimit > 0
+    return this.parallelPutToStoreThreadLimit > 0;
+  }
+
+  @VisibleForTesting
+  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
+    return preparePutToStoreMap;
+  }
+
+  public static final long FIXED_SIZE =
+      ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index f979397..2d454e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
 import org.apache.hadoop.hbase.regionserver.Segment;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -476,6 +477,14 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
+    cl = StoreHotnessProtector.class;
+    actual = StoreHotnessProtector.FIXED_SIZE;
+    expected = ClassSize.estimateBase(cl, false);
+    if (expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+
     // Block cache key overhead. Only tests fixed overhead as estimating heap
     // size of strings is hard.
     cl = BlockCacheKey.class;

http://git-wip-us.apache.org/repos/asf/hbase/blob/98ac4f12/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
new file mode 100644
index 0000000..6d41934
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
+import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category(SmallTests.class)
+public class TestStoreHotnessProtector {
+
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
+
+  @Test(timeout = 60000)
+  public void testPreparePutCounter() throws Exception {
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    Configuration conf = new Configuration();
+    conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
+    conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
+    conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
+    Region mockRegion = mock(Region.class);
+    StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf);
+
+    Store mockStore1 = mock(Store.class);
+    RegionInfo mockRegionInfo = mock(RegionInfo.class);
+    byte[] family = "testF1".getBytes();
+
+    when(mockRegion.getStore(family)).thenReturn(mockStore1);
+    when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
+    when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
+
+    when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
+    when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
+
+    final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
+    familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class)));
+
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    // PreparePutCounter not access limit
+
+    int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
+        .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
+    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+
+    for (int i = 0; i < threadCount; i++) {
+      executorService.execute(() -> {
+        try {
+          storeHotnessProtector.start(familyMaps);
+        } catch (RegionTooBusyException e) {
+          e.printStackTrace();
+          exception.set(e);
+        } finally {
+          countDownLatch.countDown();
+        }
+      });
+    }
+
+    countDownLatch.await(60, TimeUnit.SECONDS);
+    //no exception
+    Assert.assertEquals(exception.get(), null);
+    Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
+    Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+        threadCount);
+
+    // access limit
+
+    try {
+      storeHotnessProtector.start(familyMaps);
+    } catch (RegionTooBusyException e) {
+      e.printStackTrace();
+      exception.set(e);
+    }
+
+    Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class);
+
+    Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
+    // when access limit, counter will not changed.
+    Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+        threadCount + 1);
+
+    storeHotnessProtector.finish(familyMaps);
+    Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
+        threadCount);
+  }
+
+}
\ No newline at end of file