You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/25 01:43:50 UTC

[incubator-hudi] branch master updated: default implementation for HBase index qps allocator (#685)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d91a4  default implementation for HBase index qps allocator (#685)
f2d91a4 is described below

commit f2d91a455ec17e66ad319f47a2d5ad45d62c9ca3
Author: Venkat <33...@users.noreply.github.com>
AuthorDate: Fri May 24 18:43:46 2019 -0700

    default implementation for HBase index qps allocator (#685)
    
    * default implementation and configs for HBase index qps allocator
    
    * Test for QPS allocator and address CR
    
    * fix QPS allocator test
---
 .../uber/hoodie/config/HoodieHBaseIndexConfig.java | 118 ++++++++++++++---
 .../com/uber/hoodie/config/HoodieWriteConfig.java  |  32 +++++
 .../hbase/DefaultHBaseQPSResourceAllocator.java    |  52 ++++++++
 .../com/uber/hoodie/index/hbase/HBaseIndex.java    |  74 +++++++++--
 .../hbase/HBaseIndexQPSResourceAllocator.java      |  51 ++++++++
 .../index/TestHBaseQPSResourceAllocator.java       | 144 +++++++++++++++++++++
 .../java/com/uber/hoodie/index/TestHbaseIndex.java |  20 ++-
 7 files changed, 461 insertions(+), 30 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
index 3c6f1cf..b60ba5f 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
@@ -18,6 +18,7 @@
 
 package com.uber.hoodie.config;
 
+import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -35,6 +36,12 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
    * be honored for HBase Puts
    */
   public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
+
+  /**
+   * Property to set which implementation of HBase QPS resource allocator to be used
+   */
+  public static final String HBASE_INDEX_QPS_ALLOCATOR_CLASS = "hoodie.index.hbase.qps.allocator.class";
+  public static final String DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS = DefaultHBaseQPSResourceAllocator.class.getName();
   /**
    * Property to set to enable auto computation of put batch size
    */
@@ -69,6 +76,34 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
    */
   public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f;
 
+  /**
+   *  Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume
+   */
+  public static final String HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = "hoodie.index.hbase.dynamic_qps";
+  public static final boolean DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = false;
+  /**
+   *  Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads
+   */
+  public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction";
+  public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002";
+
+  public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction";
+  public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06";
+  /**
+   *  Hoodie index desired puts operation time in seconds
+   */
+  public static final String HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = "hoodie.index.hbase.desired_puts_time_in_secs";
+  public static final int DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = 600;
+  public static final String HBASE_SLEEP_MS_PUT_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.put.batch";
+  public static final String HBASE_SLEEP_MS_GET_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.get.batch";
+  public static final String HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = "hoodie.index.hbase.zk.session_timeout_ms";
+  public static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
+  public static final String HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS =
+      "hoodie.index.hbase.zk.connection_timeout_ms";
+  public static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
+  public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
+  public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
+
   public HoodieHBaseIndexConfig(final Properties props) {
       super(props);
   }
@@ -111,26 +146,68 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) {
+    public Builder hbaseZkZnodeQPSPath(String zkZnodeQPSPath) {
+      props.setProperty(HBASE_ZK_PATH_QPS_ROOT, zkZnodeQPSPath);
+      return this;
+    }
+
+    public Builder hbaseIndexGetBatchSize(int getBatchSize) {
       props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
       return this;
     }
 
-    public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) {
+    public Builder hbaseIndexPutBatchSize(int putBatchSize) {
       props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
       return this;
     }
 
-    public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute(
-        boolean putBatchSizeAutoCompute) {
-      props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
-          String.valueOf(putBatchSizeAutoCompute));
+    public Builder hbaseIndexPutBatchSizeAutoCompute(boolean putBatchSizeAutoCompute) {
+      props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(putBatchSizeAutoCompute));
+      return this;
+    }
+
+    public Builder hbaseIndexDesiredPutsTime(int desiredPutsTime) {
+      props.setProperty(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(desiredPutsTime));
+      return this;
+    }
+
+    public Builder hbaseIndexShouldComputeQPSDynamically(boolean shouldComputeQPsDynamically) {
+      props.setProperty(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(shouldComputeQPsDynamically));
+      return this;
+    }
+
+    public Builder hbaseIndexQPSFraction(float qpsFraction) {
+      props.setProperty(HBASE_QPS_FRACTION_PROP, String.valueOf(qpsFraction));
+      return this;
+    }
+
+    public Builder hbaseIndexMinQPSFraction(float minQPSFraction) {
+      props.setProperty(HBASE_MIN_QPS_FRACTION_PROP, String.valueOf(minQPSFraction));
+      return this;
+    }
+
+    public Builder hbaseIndexMaxQPSFraction(float maxQPSFraction) {
+      props.setProperty(HBASE_MAX_QPS_FRACTION_PROP, String.valueOf(maxQPSFraction));
+      return this;
+    }
+
+    public Builder hbaseIndexSleepMsBetweenPutBatch(int sleepMsBetweenPutBatch) {
+      props.setProperty(HBASE_SLEEP_MS_PUT_BATCH_PROP, String.valueOf(sleepMsBetweenPutBatch));
+      return this;
+    }
+
+    public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
+      props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
+      return this;
+    }
+
+    public Builder hbaseIndexZkSessionTimeout(int zkSessionTimeout) {
+      props.setProperty(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(zkSessionTimeout));
       return this;
     }
 
-    public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) {
-      props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP,
-          String.valueOf(qpsFraction));
+    public Builder hbaseIndexZkConnectionTimeout(int zkConnectionTimeout) {
+      props.setProperty(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(zkConnectionTimeout));
       return this;
     }
 
@@ -166,14 +243,25 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
           HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
       setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
-          HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
-          String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
+          HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
       setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP),
           HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
-      setDefaultOnCondition(props,
-          !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
-          HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(
-              DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER));
+      setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
+          HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER));
+      setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY),
+          HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
+      setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS),
+          HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
+      setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS),
+          HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
+      setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT),
+          HBASE_ZK_PATH_QPS_ROOT, String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT));
+      setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS),
+          HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS));
+      setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS),
+          HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
+      setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS),
+          HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
       return config;
     }
 
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 3c69fa9..657d68f 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -315,6 +315,30 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP));
   }
 
+  public String getHBaseQPSResourceAllocatorClass() {
+    return props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_QPS_ALLOCATOR_CLASS);
+  }
+
+  public String getHBaseQPSZKnodePath() {
+    return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZK_PATH_QPS_ROOT);
+  }
+
+  public String getHBaseZkZnodeSessionTimeout() {
+    return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS);
+  }
+
+  public String getHBaseZkZnodeConnectionTimeout() {
+    return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS);
+  }
+
+  public boolean getHBaseIndexShouldComputeQPSDynamically() {
+    return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
+  }
+
+  public int getHBaseIndexDesiredPutsTime() {
+    return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
+  }
+
   /**
    * Fraction of the global share of QPS that should be allocated to this job.
    * Let's say there are 3 jobs which have input size in terms of number of rows
@@ -325,6 +349,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP));
   }
 
+  public float getHBaseIndexMinQPSFraction() {
+    return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MIN_QPS_FRACTION_PROP));
+  }
+
+  public float getHBaseIndexMaxQPSFraction() {
+    return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_FRACTION_PROP));
+  }
+
   /**
    * This should be same across various jobs. This is intended to limit the aggregate
    * QPS generated across various Hoodie jobs to an Hbase Region Server
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java
new file mode 100644
index 0000000..1f74dd2
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java
@@ -0,0 +1,52 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.index.hbase;
+
+import com.uber.hoodie.config.HoodieWriteConfig;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator {
+  private HoodieWriteConfig hoodieWriteConfig;
+  private static Logger logger = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class);
+
+  public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) {
+    this.hoodieWriteConfig = hoodieWriteConfig;
+  }
+
+  @Override
+  public float calculateQPSFractionForPutsTime(final long numPuts, final int numRegionServers) {
+    // Just return the configured qps_fraction without calculating it runtime
+    return hoodieWriteConfig.getHbaseIndexQPSFraction();
+  }
+
+  @Override
+  public float acquireQPSResources(final float desiredQPSFraction, final long numPuts) {
+    // Return the requested QPSFraction in this default implementation
+    return desiredQPSFraction;
+  }
+
+  @Override
+  public void releaseQPSResources() {
+    // Do nothing, as there are no resources locked in default implementation
+    logger.info(String.format("Release QPS resources called for %s with default implementation, do nothing",
+        this.hoodieWriteConfig.getHbaseTableName()));
+  }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index c706858..e275b03 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ReflectionUtils;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
@@ -60,6 +61,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
 
+import scala.Tuple2;
+
 /**
  * Hoodie Index implementation backed by HBase
  */
@@ -99,10 +102,24 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   }
 
   private void init(HoodieWriteConfig config) {
-    multiPutBatchSize = config.getHbaseIndexGetBatchSize();
-    qpsFraction = config.getHbaseIndexQPSFraction();
-    maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
-    putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
+    this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
+    this.qpsFraction = config.getHbaseIndexQPSFraction();
+    this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
+    this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
+  }
+
+  @VisibleForTesting
+  public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
+    try {
+      logger.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
+      final HBaseIndexQPSResourceAllocator resourceAllocator =
+          (HBaseIndexQPSResourceAllocator) ReflectionUtils.loadClass(
+              config.getHBaseQPSResourceAllocatorClass(), config);
+      return resourceAllocator;
+    } catch (Exception e) {
+      logger.warn("error while instantiating HBaseIndexQPSResourceAllocator", e);
+    }
+    return new DefaultHBaseQPSResourceAllocator(config);
   }
 
   @Override
@@ -351,12 +368,27 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    setPutBatchSize(writeStatusRDD, jsc);
-    return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
+    JavaRDD<WriteStatus> writeStatusResultRDD;
+    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
+    logger.info("multiPutBatchSize: before puts" + multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(
+        updateLocationFunction(), true);
+    // Forcing a spark action so HBase puts are triggered before releasing resources
+    if (this.config.getHBaseIndexShouldComputeQPSDynamically()) {
+      logger.info("writestatus count: " + writeStatusJavaRDD.count());
+      writeStatusResultRDD = writeStatusRDD;
+    } else {
+      writeStatusResultRDD = writeStatusJavaRDD;
+    }
+    // Release QPS resources as HBAse puts are done at this point
+    hBaseIndexQPSResourceAllocator.releaseQPSResources();
+    return writeStatusResultRDD;
   }
 
   private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
-      final JavaSparkContext jsc) {
+                               HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator,
+                               final JavaSparkContext jsc) {
     if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
       SparkConf conf = jsc.getConf();
       int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
@@ -370,22 +402,36 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
         If a writeStatus has any insert, it implies that the corresponding task contacts HBase for
         doing puts, since we only do puts for inserts from HBaseIndex.
        */
-      int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD);
+      final Tuple2<Long, Integer> numPutsParallelismTuple  = getHBasePutAccessParallelism(writeStatusRDD);
+      final long numPuts = numPutsParallelismTuple._1;
+      final int hbasePutsParallelism = numPutsParallelismTuple._2;
+      this.numRegionServersForTable = getNumRegionServersAliveForTable();
+      final float desiredQPSFraction = hBaseIndexQPSResourceAllocator
+                                           .calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable);
+      logger.info("Desired QPSFraction :" + desiredQPSFraction);
+      logger.info("Number HBase puts :" + numPuts);
+      logger.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
+      final float availableQpsFraction = hBaseIndexQPSResourceAllocator
+                                             .acquireQPSResources(desiredQPSFraction, numPuts);
+      logger.info("Allocated QPS Fraction :" + availableQpsFraction);
       multiPutBatchSize = putBatchSizeCalculator
           .getBatchSize(
-              getNumRegionServersAliveForTable(),
+              numRegionServersForTable,
               maxQpsPerRegionServer,
-              hbasePutAccessParallelism,
+              hbasePutsParallelism,
               maxExecutors,
               SLEEP_TIME_MILLISECONDS,
-              qpsFraction);
+              availableQpsFraction);
+      logger.info("multiPutBatchSize :" + multiPutBatchSize);
     }
   }
 
   @VisibleForTesting
-  public int getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
-    return Math.toIntExact(Math.max(writeStatusRDD
-        .filter(w -> w.getStat().getNumInserts() > 0).count(), 1));
+  public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
+    final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD =
+        writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+            .mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
+    return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
   }
 
   public static class HbasePutBatchSizeCalculator implements Serializable {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java
new file mode 100644
index 0000000..eccdb97
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java
@@ -0,0 +1,51 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.index.hbase;
+
+import java.io.Serializable;
+
+/**
+ * <code>HBaseIndexQPSResourceAllocator</code> defines methods to manage resource allocation for HBase index operations
+ */
+public interface HBaseIndexQPSResourceAllocator extends Serializable {
+
+  /**
+   * This method returns the QPS Fraction value that needs to be acquired such that the respective
+   * HBase index operation can be completed in desiredPutsTime.
+   *
+   * @param numPuts                Number of inserts to be written to HBase index
+   * @param desiredPutsTimeInSecs  Total expected time for the HBase inserts operation
+   * @return QPS fraction that needs to be acquired.
+   */
+  float calculateQPSFractionForPutsTime(final long numPuts, final int desiredPutsTimeInSecs);
+
+  /**
+   * This method acquires the requested QPS Fraction against HBase cluster for index operation.
+   *
+   * @param desiredQPSFraction  QPS fraction that needs to be requested and acquired
+   * @param numPuts             Number of inserts to be written to HBase index
+   * @return value of the acquired QPS Fraction.
+   */
+  float acquireQPSResources(final float desiredQPSFraction, final long numPuts);
+
+  /**
+   * This method releases the acquired QPS Fraction
+   */
+  void releaseQPSResources();
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java
new file mode 100644
index 0000000..54af10b
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.index;
+
+import com.uber.hoodie.common.HoodieClientTestUtils;
+import com.uber.hoodie.common.HoodieTestDataGenerator;
+import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.config.HoodieCompactionConfig;
+import com.uber.hoodie.config.HoodieHBaseIndexConfig;
+import com.uber.hoodie.config.HoodieIndexConfig;
+import com.uber.hoodie.config.HoodieStorageConfig;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
+import com.uber.hoodie.index.hbase.HBaseIndex;
+import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator;
+import java.io.File;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestHBaseQPSResourceAllocator {
+  private static JavaSparkContext jsc = null;
+  private static String tableName = "test_table";
+  private String basePath = null;
+  private static HBaseTestingUtility utility;
+  private static Configuration hbaseConfig;
+  private static String QPS_TEST_SUFFIX_PATH = "qps_test_suffix";
+
+  @AfterClass
+  public static void clean() {
+    if (jsc != null) {
+      jsc.stop();
+    }
+  }
+
+  @BeforeClass
+  public static void init() throws Exception {
+    utility = new HBaseTestingUtility();
+    utility.startMiniCluster();
+    hbaseConfig = utility.getConnection().getConfiguration();
+    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestQPSResourceAllocator"));
+  }
+
+  @After
+  public void clear() {
+    if (basePath != null) {
+      new File(basePath).delete();
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
+    // Create a temp folder as the base path
+    TemporaryFolder folder = new TemporaryFolder();
+    folder.create();
+    basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
+    // Initialize table
+    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
+  }
+
+  @Test
+  public void testsDefaultQPSResourceAllocator() {
+    HoodieWriteConfig config = getConfig(Optional.empty());
+    HBaseIndex index = new HBaseIndex(config);
+    HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
+    Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
+        DefaultHBaseQPSResourceAllocator.class.getName());
+    Assert.assertEquals(config.getHbaseIndexQPSFraction(),
+        hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
+  }
+
+  @Test
+  public void testsExplicitDefaultQPSResourceAllocator() {
+    HoodieWriteConfig config = getConfig(Optional.of(HoodieHBaseIndexConfig.DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
+    HBaseIndex index = new HBaseIndex(config);
+    HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
+    Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
+        DefaultHBaseQPSResourceAllocator.class.getName());
+    Assert.assertEquals(config.getHbaseIndexQPSFraction(),
+        hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
+  }
+
+  @Test
+  public void testsInvalidQPSResourceAllocator() {
+    HoodieWriteConfig config = getConfig(Optional.of("InvalidResourceAllocatorClassName"));
+    HBaseIndex index = new HBaseIndex(config);
+    HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
+    Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
+        DefaultHBaseQPSResourceAllocator.class.getName());
+    Assert.assertEquals(config.getHbaseIndexQPSFraction(),
+        hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
+  }
+
+  private HoodieWriteConfig getConfig(Optional<String> resourceAllocatorClass) {
+    HoodieHBaseIndexConfig hoodieHBaseIndexConfig = getConfigWithResourceAllocator(resourceAllocatorClass);
+    return getConfigBuilder(hoodieHBaseIndexConfig).build();
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(HoodieHBaseIndexConfig hoodieHBaseIndexConfig) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+               .withParallelism(1, 1).withCompactionConfig(
+                   HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false)
+                       .build()).withAutoCommit(false)
+               .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+               .forTable("test-trip-table").withIndexConfig(
+                   HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
+                       .withHBaseIndexConfig(hoodieHBaseIndexConfig)
+                       .build());
+  }
+
+  private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Optional<String> resourceAllocatorClass) {
+    HoodieHBaseIndexConfig.Builder builder =
+        new HoodieHBaseIndexConfig.Builder()
+            .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
+            .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
+            .hbaseIndexGetBatchSize(100);
+    if (resourceAllocatorClass.isPresent()) {
+      builder.withQPSResourceAllocatorType(resourceAllocatorClass.get());
+    }
+    return builder.build();
+  }
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index 16e67c4..5f6a56a 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -35,8 +35,10 @@ import com.uber.hoodie.config.HoodieHBaseIndexConfig;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieStorageConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
 import com.uber.hoodie.index.hbase.HBaseIndex;
 import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
+import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.File;
 import java.util.Arrays;
@@ -64,6 +66,8 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runners.MethodSorters;
 import org.mockito.Mockito;
 
+import scala.Tuple2;
+
 /**
  * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests,
  * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use
@@ -331,9 +335,23 @@ public class TestHbaseIndex {
             getSampleWriteStatus(0, 3),
             getSampleWriteStatus(10, 0)),
         10);
-    final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD);
+    final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
+    final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
+    final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
     Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
     Assert.assertEquals(2, hbasePutAccessParallelism);
+    Assert.assertEquals(11, hbaseNumPuts);
+  }
+
+  @Test
+  public void testsHBaseIndexDefaultQPSResourceAllocator() {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
+    Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
+        DefaultHBaseQPSResourceAllocator.class.getName());
+    Assert.assertEquals(config.getHbaseIndexQPSFraction(),
+        hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
   }
 
   private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {