You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/14 03:32:09 UTC
[hudi] branch master updated: [HUDI-994] Split TestHBaseIndex to
unit tests (#1818)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f5dc8ca [HUDI-994] Split TestHBaseIndex to unit tests (#1818)
f5dc8ca is described below
commit f5dc8ca733014d15a6d7966a5b6ae4308868adfa
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Mon Jul 13 20:32:01 2020 -0700
[HUDI-994] Split TestHBaseIndex to unit tests (#1818)
- Refactor and improve TestHBaseIndex for performance
- Move HBaseIndex unit tests to different test classes
---
.../org/apache/hudi/index/hbase/HBaseIndex.java | 8 +-
.../apache/hudi/index/hbase/TestHBaseIndex.java | 187 +++++++--------------
.../hudi/index/hbase/TestHBaseIndexUsage.java | 45 +++++
.../hbase/TestHBasePutBatchSizeCalculator.java | 66 ++++++++
4 files changed, 177 insertions(+), 129 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 0f3a89a..aab12be 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -97,7 +97,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private Integer multiPutBatchSize;
private Integer numRegionServersForTable;
private final String tableName;
- private HbasePutBatchSizeCalculator putBatchSizeCalculator;
+ private HBasePutBatchSizeCalculator putBatchSizeCalculator;
public HBaseIndex(HoodieWriteConfig config) {
super(config);
@@ -110,7 +110,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
this.qpsFraction = config.getHbaseIndexQPSFraction();
this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
- this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
+ this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator();
this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
}
@@ -392,10 +392,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
}
- public static class HbasePutBatchSizeCalculator implements Serializable {
+ public static class HBasePutBatchSizeCalculator implements Serializable {
private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
- private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class);
+ private static final Logger LOG = LogManager.getLogger(HBasePutBatchSizeCalculator.class);
/**
* Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index d2ff4a4..6b6f11f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.hbase;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -31,7 +30,6 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
@@ -56,15 +54,14 @@ import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import scala.Tuple2;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atMost;
@@ -75,23 +72,20 @@ import static org.mockito.Mockito.when;
/**
* Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests,
- * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use
+ * (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835). Hence, the need to use
* {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
*/
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
public class TestHBaseIndex extends HoodieClientTestHarness {
+ private static final String TABLE_NAME = "test_table";
private static HBaseTestingUtility utility;
private static Configuration hbaseConfig;
- private static String tableName = "test_table";
-
- public TestHBaseIndex() {
- }
@AfterAll
public static void clean() throws Exception {
if (utility != null) {
- utility.deleteTable(tableName);
+ utility.deleteTable(TABLE_NAME);
utility.shutdownMiniCluster();
}
}
@@ -105,7 +99,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
utility = new HBaseTestingUtility(hbaseConfig);
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
- utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
+ utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"));
}
@BeforeEach
@@ -129,9 +123,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
@Test
public void testSimpleTagLocationAndUpdate() throws Exception {
-
- String newCommitTime = "001";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ final String newCommitTime = "001";
+ final int numRecords = 10;
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
@@ -142,8 +136,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
writeClient.startCommitWithTime(newCommitTime);
@@ -152,26 +146,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+ JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
- assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
- assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+ assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+ assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}
}
@Test
public void testTagLocationAndDuplicateUpdate() throws Exception {
- String newCommitTime = "001";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+ final String newCommitTime = "001";
+ final int numRecords = 10;
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
@@ -183,7 +178,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
- JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ index.tagLocation(writeRecords, jsc, hoodieTable);
// Duplicate upsert and ensure correctness is maintained
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -199,10 +194,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
- assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
- assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
+ List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+ assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+ assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}
@@ -213,8 +208,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = getHoodieWriteClient(config);
- String newCommitTime = writeClient.startCommit();
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ final String newCommitTime = writeClient.startCommit();
+ final int numRecords = 10;
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -226,13 +222,13 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, writeStatues);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should tag them
- JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
+ List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
// check tagged records are tagged with correct fileIds
List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
- assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
- List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
+ assertEquals(0, records2.stream().filter(record -> record.getCurrentLocation().getFileId() == null).count());
+ List<String> taggedFileIds = records2.stream().map(record -> record.getCurrentLocation().getFileId()).distinct().collect(Collectors.toList());
// both lists should match
assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
@@ -242,9 +238,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
// back commit
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
- assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+ assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
}
@Test
@@ -255,7 +251,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Mock hbaseConnection and related entities
Connection hbaseConnection = mock(Connection.class);
HTable table = mock(HTable.class);
- when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+ when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
when(table.get((List<Get>) any())).thenReturn(new Result[0]);
// only for test, set the hbaseConnection to mocked object
@@ -304,7 +300,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Mock hbaseConnection and related entities
Connection hbaseConnection = mock(Connection.class);
HTable table = mock(HTable.class);
- when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+ when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
when(table.get((List<Get>) any())).thenReturn(new Result[0]);
// only for test, set the hbaseConnection to mocked object
@@ -320,42 +316,6 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
}
@Test
- public void testPutBatchSizeCalculation() {
- HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator();
-
- // All asserts cases below are derived out of the first
- // example below, with change in one parameter at a time.
-
- int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
- // Expected batchSize is 8 because in that case, total request sent in one second is below
- // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
- // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
- // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
- assertEquals(8, putBatchSize);
-
- // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
- int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
- assertEquals(4, putBatchSize2);
-
- // If the parallelism is halved, batchSize has to double
- int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
- assertEquals(16, putBatchSize3);
-
- // If the parallelism is halved, batchSize has to double.
- // This time parallelism is driven by numTasks rather than numExecutors
- int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
- assertEquals(16, putBatchSize4);
-
- // If sleepTimeMs is halved, batchSize has to halve
- int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
- assertEquals(4, putBatchSize5);
-
- // If maxQPSPerRegionServer is doubled, batchSize also doubles
- int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
- assertEquals(16, putBatchSize6);
- }
-
- @Test
public void testsHBasePutAccessParallelism() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
@@ -384,20 +344,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
}
@Test
- public void testsHBaseIndexDefaultQPSResourceAllocator() {
- HoodieWriteConfig config = getConfig();
- HBaseIndex index = new HBaseIndex(config);
- HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
- assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
- DefaultHBaseQPSResourceAllocator.class.getName());
- assertEquals(config.getHbaseIndexQPSFraction(),
- hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
- }
-
- @Test
public void testSmallBatchSize() throws Exception {
- String newCommitTime = "001";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ final String newCommitTime = "001";
+ final int numRecords = 10;
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
@@ -408,9 +358,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
-
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
writeClient.startCommitWithTime(newCommitTime);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -418,26 +367,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+ JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
- assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
- assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+ assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+ assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}
}
@Test
public void testDelete() throws Exception {
- String newCommitTime = "001";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+ final String newCommitTime = "001";
+ final int numRecords = 10;
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
@@ -448,8 +398,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert records
writeClient.startCommitWithTime(newCommitTime);
@@ -460,18 +410,17 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
- assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
- assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
+ List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
+ assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+ assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
// Delete all records. This has to be done directly as deleting index entries
// is not implemented via HoodieWriteClient
- Option recordMetadata = Option.empty();
JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
- w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata));
+ w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), Option.empty()));
assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
newWriteStatus.setStat(new HoodieWriteStat());
return newWriteStatus;
@@ -481,26 +430,14 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
assertNoWriteErrors(deleteStatus.collect());
// Ensure no records can be tagged
- javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
- assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
- assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
- assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
+ assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
+ assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}
}
- @Test
- public void testFeatureSupport() {
- HoodieWriteConfig config = getConfig();
- HBaseIndex index = new HBaseIndex(config);
-
- assertTrue(index.canIndexLogFiles());
- assertThrows(UnsupportedOperationException.class, () -> {
- HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
- }, "HbaseIndex supports fetchRecordLocation");
- }
-
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
final WriteStatus writeStatus = new WriteStatus(false, 0.1);
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
@@ -530,7 +467,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseIndexPutBatchSizeAutoCompute(true)
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
- .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
+ .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
.build());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java
new file mode 100644
index 0000000..c1bf157
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.hbase;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHBaseIndexUsage {
+
+ @Test
+ public void testFeatureSupport() {
+ HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+ HBaseIndex index = new HBaseIndex(config);
+
+ assertTrue(index.canIndexLogFiles());
+ assertThrows(UnsupportedOperationException.class, () -> {
+ index.fetchRecordLocation(null, null, null);
+ }, "HBaseIndex should not support fetchRecordLocation");
+ }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java
new file mode 100644
index 0000000..3109942
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.hbase;
+
+import org.apache.hudi.index.hbase.HBaseIndex.HBasePutBatchSizeCalculator;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHBasePutBatchSizeCalculator {
+
+ @Test
+ public void testPutBatchSizeCalculation() {
+ HBasePutBatchSizeCalculator batchSizeCalculator = new HBasePutBatchSizeCalculator();
+
+ // All asserts cases below are derived out of the first
+ // example below, with change in one parameter at a time.
+
+ int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
+ // Expected batchSize is 8 because in that case, total request sent in one second is below
+ // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
+ // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
+ // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
+ assertEquals(8, putBatchSize);
+
+ // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
+ int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
+ assertEquals(4, putBatchSize2);
+
+ // If the parallelism is halved, batchSize has to double
+ int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
+ assertEquals(16, putBatchSize3);
+
+ // If the parallelism is halved, batchSize has to double.
+ // This time parallelism is driven by numTasks rather than numExecutors
+ int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
+ assertEquals(16, putBatchSize4);
+
+ // If sleepTimeMs is halved, batchSize has to halve
+ int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
+ assertEquals(4, putBatchSize5);
+
+ // If maxQPSPerRegionServer is doubled, batchSize also doubles
+ int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
+ assertEquals(16, putBatchSize6);
+ }
+
+}