You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/14 03:32:09 UTC

[hudi] branch master updated: [HUDI-994] Split TestHBaseIndex to unit tests (#1818)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f5dc8ca  [HUDI-994] Split TestHBaseIndex to unit tests  (#1818)
f5dc8ca is described below

commit f5dc8ca733014d15a6d7966a5b6ae4308868adfa
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Mon Jul 13 20:32:01 2020 -0700

    [HUDI-994] Split TestHBaseIndex to unit tests  (#1818)
    
    - Refactor and improve TestHBaseIndex for performance
    - Move HBaseIndex unit tests to different test classes
---
 .../org/apache/hudi/index/hbase/HBaseIndex.java    |   8 +-
 .../apache/hudi/index/hbase/TestHBaseIndex.java    | 187 +++++++--------------
 .../hudi/index/hbase/TestHBaseIndexUsage.java      |  45 +++++
 .../hbase/TestHBasePutBatchSizeCalculator.java     |  66 ++++++++
 4 files changed, 177 insertions(+), 129 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 0f3a89a..aab12be 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -97,7 +97,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   private Integer multiPutBatchSize;
   private Integer numRegionServersForTable;
   private final String tableName;
-  private HbasePutBatchSizeCalculator putBatchSizeCalculator;
+  private HBasePutBatchSizeCalculator putBatchSizeCalculator;
 
   public HBaseIndex(HoodieWriteConfig config) {
     super(config);
@@ -110,7 +110,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
     this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
     this.qpsFraction = config.getHbaseIndexQPSFraction();
     this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
-    this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
+    this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator();
     this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
   }
 
@@ -392,10 +392,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
     return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
   }
 
-  public static class HbasePutBatchSizeCalculator implements Serializable {
+  public static class HBasePutBatchSizeCalculator implements Serializable {
 
     private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
-    private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class);
+    private static final Logger LOG = LogManager.getLogger(HBasePutBatchSizeCalculator.class);
 
     /**
      * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index d2ff4a4..6b6f11f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.hbase;
 
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -31,7 +30,6 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieTestDataGenerator;
@@ -56,15 +54,14 @@ import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atMost;
@@ -75,23 +72,20 @@ import static org.mockito.Mockito.when;
 
 /**
  * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests,
- * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use
+ * (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835). Hence, the need to use
  * {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
  */
 @TestMethodOrder(MethodOrderer.Alphanumeric.class)
 public class TestHBaseIndex extends HoodieClientTestHarness {
 
+  private static final String TABLE_NAME = "test_table";
   private static HBaseTestingUtility utility;
   private static Configuration hbaseConfig;
-  private static String tableName = "test_table";
-
-  public TestHBaseIndex() {
-  }
 
   @AfterAll
   public static void clean() throws Exception {
     if (utility != null) {
-      utility.deleteTable(tableName);
+      utility.deleteTable(TABLE_NAME);
       utility.shutdownMiniCluster();
     }
   }
@@ -105,7 +99,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     utility = new HBaseTestingUtility(hbaseConfig);
     utility.startMiniCluster();
     hbaseConfig = utility.getConnection().getConfiguration();
-    utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
+    utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"));
   }
 
   @BeforeEach
@@ -129,9 +123,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
   @Test
   public void testSimpleTagLocationAndUpdate() throws Exception {
-
-    String newCommitTime = "001";
-    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+    final String newCommitTime = "001";
+    final int numRecords = 10;
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
     // Load to memory
@@ -142,8 +136,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Insert 200 records
       writeClient.startCommitWithTime(newCommitTime);
@@ -152,26 +146,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
       // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
       // commit
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Now commit this & update location of records inserted and validate no errors
       writeClient.commit(newCommitTime, writeStatues);
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
-      assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
-      assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+      assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
           && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
     }
   }
 
   @Test
   public void testTagLocationAndDuplicateUpdate() throws Exception {
-    String newCommitTime = "001";
-    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+    final String newCommitTime = "001";
+    final int numRecords = 10;
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
     // Load to memory
@@ -183,7 +178,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
-    JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+    index.tagLocation(writeRecords, jsc, hoodieTable);
 
     // Duplicate upsert and ensure correctness is maintained
     // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -199,10 +194,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // Now tagLocation for these records, hbaseIndex should tag them correctly
     metaClient = HoodieTableMetaClient.reload(metaClient);
     hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-    assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
-    assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
-    assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
+    List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+    assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+    assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
         && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
   }
 
@@ -213,8 +208,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
-    String newCommitTime = writeClient.startCommit();
-    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+    final String newCommitTime = writeClient.startCommit();
+    final int numRecords = 10;
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
@@ -226,13 +222,13 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     writeClient.commit(newCommitTime, writeStatues);
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
     // Now tagLocation for these records, hbaseIndex should tag them
-    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-    assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
+    List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
 
     // check tagged records are tagged with correct fileIds
     List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
-    assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
-    List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
+    assertEquals(0, records2.stream().filter(record -> record.getCurrentLocation().getFileId() == null).count());
+    List<String> taggedFileIds = records2.stream().map(record -> record.getCurrentLocation().getFileId()).distinct().collect(Collectors.toList());
 
     // both lists should match
     assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
@@ -242,9 +238,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
     // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
     // back commit
-    javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-    assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
-    assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
+    List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+    assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
   }
 
   @Test
@@ -255,7 +251,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // Mock hbaseConnection and related entities
     Connection hbaseConnection = mock(Connection.class);
     HTable table = mock(HTable.class);
-    when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+    when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
     when(table.get((List<Get>) any())).thenReturn(new Result[0]);
 
     // only for test, set the hbaseConnection to mocked object
@@ -304,7 +300,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // Mock hbaseConnection and related entities
     Connection hbaseConnection = mock(Connection.class);
     HTable table = mock(HTable.class);
-    when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+    when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
     when(table.get((List<Get>) any())).thenReturn(new Result[0]);
 
     // only for test, set the hbaseConnection to mocked object
@@ -320,42 +316,6 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testPutBatchSizeCalculation() {
-    HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator();
-
-    // All asserts cases below are derived out of the first
-    // example below, with change in one parameter at a time.
-
-    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
-    // Expected batchSize is 8 because in that case, total request sent in one second is below
-    // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
-    // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
-    // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
-    assertEquals(8, putBatchSize);
-
-    // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
-    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
-    assertEquals(4, putBatchSize2);
-
-    // If the parallelism is halved, batchSize has to double
-    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
-    assertEquals(16, putBatchSize3);
-
-    // If the parallelism is halved, batchSize has to double.
-    // This time parallelism is driven by numTasks rather than numExecutors
-    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
-    assertEquals(16, putBatchSize4);
-
-    // If sleepTimeMs is halved, batchSize has to halve
-    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
-    assertEquals(4, putBatchSize5);
-
-    // If maxQPSPerRegionServer is doubled, batchSize also doubles
-    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
-    assertEquals(16, putBatchSize6);
-  }
-
-  @Test
   public void testsHBasePutAccessParallelism() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
@@ -384,20 +344,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testsHBaseIndexDefaultQPSResourceAllocator() {
-    HoodieWriteConfig config = getConfig();
-    HBaseIndex index = new HBaseIndex(config);
-    HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
-    assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
-        DefaultHBaseQPSResourceAllocator.class.getName());
-    assertEquals(config.getHbaseIndexQPSFraction(),
-        hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
-  }
-
-  @Test
   public void testSmallBatchSize() throws Exception {
-    String newCommitTime = "001";
-    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+    final String newCommitTime = "001";
+    final int numRecords = 10;
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
     // Load to memory
@@ -408,9 +358,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
-
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
       // Insert 200 records
       writeClient.startCommitWithTime(newCommitTime);
       JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -418,26 +367,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
       // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
       // commit
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Now commit this & update location of records inserted and validate no errors
       writeClient.commit(newCommitTime, writeStatues);
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
-      assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
-      assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+      assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
           && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
     }
   }
 
   @Test
   public void testDelete() throws Exception {
-    String newCommitTime = "001";
-    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+    final String newCommitTime = "001";
+    final int numRecords = 10;
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
     // Load to memory
@@ -448,8 +398,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Insert records
       writeClient.startCommitWithTime(newCommitTime);
@@ -460,18 +410,17 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
-      assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
-      assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
+      List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
+      assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
           && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
 
       // Delete all records. This has to be done directly as deleting index entries
       // is not implemented via HoodieWriteClient
-      Option recordMetadata = Option.empty();
       JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
         WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
-        w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata));
+        w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), Option.empty()));
         assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
         newWriteStatus.setStat(new HoodieWriteStat());
         return newWriteStatus;
@@ -481,26 +430,14 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       assertNoWriteErrors(deleteStatus.collect());
 
       // Ensure no records can be tagged
-      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
-      assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
-      assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
-      assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+      assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
           && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
     }
   }
 
-  @Test
-  public void testFeatureSupport() {
-    HoodieWriteConfig config = getConfig();
-    HBaseIndex index = new HBaseIndex(config);
-
-    assertTrue(index.canIndexLogFiles());
-    assertThrows(UnsupportedOperationException.class, () -> {
-      HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
-    }, "HbaseIndex supports fetchRecordLocation");
-  }
-
   private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
     final WriteStatus writeStatus = new WriteStatus(false, 0.1);
     HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
@@ -530,7 +467,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
                 .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
                 .hbaseIndexPutBatchSizeAutoCompute(true)
                 .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
-                .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
+                .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
                 .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
             .build());
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java
new file mode 100644
index 0000000..c1bf157
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.hbase;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHBaseIndexUsage {
+
+  @Test
+  public void testFeatureSupport() {
+    HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+    HBaseIndex index = new HBaseIndex(config);
+
+    assertTrue(index.canIndexLogFiles());
+    assertThrows(UnsupportedOperationException.class, () -> {
+      index.fetchRecordLocation(null, null, null);
+    }, "HBaseIndex should not support fetchRecordLocation");
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java
new file mode 100644
index 0000000..3109942
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.hbase;
+
+import org.apache.hudi.index.hbase.HBaseIndex.HBasePutBatchSizeCalculator;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHBasePutBatchSizeCalculator {
+
+  @Test
+  public void testPutBatchSizeCalculation() {
+    HBasePutBatchSizeCalculator batchSizeCalculator = new HBasePutBatchSizeCalculator();
+
+    // All asserts cases below are derived out of the first
+    // example below, with change in one parameter at a time.
+
+    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
+    // Expected batchSize is 8 because in that case, total request sent in one second is below
+    // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
+    // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
+    // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
+    assertEquals(8, putBatchSize);
+
+    // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
+    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
+    assertEquals(4, putBatchSize2);
+
+    // If the parallelism is halved, batchSize has to double
+    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
+    assertEquals(16, putBatchSize3);
+
+    // If the parallelism is halved, batchSize has to double.
+    // This time parallelism is driven by numTasks rather than numExecutors
+    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
+    assertEquals(16, putBatchSize4);
+
+    // If sleepTimeMs is halved, batchSize has to halve
+    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
+    assertEquals(4, putBatchSize5);
+
+    // If maxQPSPerRegionServer is doubled, batchSize also doubles
+    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
+    assertEquals(16, putBatchSize6);
+  }
+
+}