You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/15 00:29:00 UTC
[hudi] branch master updated: [HUDI-996] Add functional test in
hudi-client (#1824)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b399b4a [HUDI-996] Add functional test in hudi-client (#1824)
b399b4a is described below
commit b399b4ad43b8caa69f129e756e51ad4bc8c81de2
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Tue Jul 14 17:28:50 2020 -0700
[HUDI-996] Add functional test in hudi-client (#1824)
- Add functional test suite in hudi-client
- Tag TestHBaseIndex as functional
---
hudi-client/pom.xml | 18 +++++
...rovider.java => ClientFunctionalTestSuite.java} | 20 +++---
.../apache/hudi/index/hbase/TestHBaseIndex.java | 80 ++++++++++------------
.../hudi/testutils/FunctionalTestHarness.java | 52 ++++++++++++--
.../testutils/{ => providers}/DFSProvider.java | 2 +-
.../HoodieMetaClientProvider.java} | 22 +++---
.../HoodieWriteClientProvider.java} | 16 ++---
.../testutils/{ => providers}/SparkProvider.java | 2 +-
8 files changed, 131 insertions(+), 81 deletions(-)
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 326cf83..a390923 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -268,5 +268,23 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-runner</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-suite-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-commons</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
similarity index 70%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
index 16cc471..4e62618 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
@@ -17,18 +17,16 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.platform.suite.api.IncludeTags;
+import org.junit.platform.suite.api.SelectPackages;
+import org.junit.runner.RunWith;
-public interface DFSProvider {
-
- MiniDFSCluster dfsCluster();
-
- DistributedFileSystem dfs();
-
- Path dfsBasePath();
+@RunWith(JUnitPlatform.class)
+@SelectPackages("org.apache.hudi.index")
+@IncludeTags("functional")
+public class ClientFunctionalTestSuite {
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index 6b6f11f..b3d2f5a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hadoop.conf.Configuration;
@@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@@ -76,12 +76,17 @@ import static org.mockito.Mockito.when;
* {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
*/
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
-public class TestHBaseIndex extends HoodieClientTestHarness {
+@Tag("functional")
+public class TestHBaseIndex extends FunctionalTestHarness {
private static final String TABLE_NAME = "test_table";
private static HBaseTestingUtility utility;
private static Configuration hbaseConfig;
+ private Configuration hadoopConf;
+ private HoodieTestDataGenerator dataGen;
+ private HoodieTableMetaClient metaClient;
+
@AfterAll
public static void clean() throws Exception {
if (utility != null) {
@@ -104,21 +109,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
- // Initialize a local spark env
- initSparkContexts("TestHBaseIndex");
+ hadoopConf = jsc().hadoopConfiguration();
hadoopConf.addResource(utility.getConfiguration());
-
- // Create a temp folder as the base path
- initPath();
- initTestDataGenerator();
- initMetaClient();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupTestDataGenerator();
- cleanupClients();
+ metaClient = getHoodieMetaClient(hadoopConf, basePath());
+ dataGen = new HoodieTestDataGenerator();
}
@Test
@@ -126,7 +120,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -136,7 +130,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
@@ -146,7 +140,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
@@ -154,7 +148,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -167,7 +161,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -178,7 +172,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
- index.tagLocation(writeRecords, jsc, hoodieTable);
+ index.tagLocation(writeRecords, jsc(), hoodieTable);
// Duplicate upsert and ensure correctness is maintained
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -194,7 +188,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
@@ -211,7 +205,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = writeClient.startCommit();
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert 200 records
@@ -222,7 +216,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, writeStatues);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should tag them
- List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
// check tagged records are tagged with correct fileIds
@@ -238,7 +232,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
// back commit
- List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
}
@@ -262,7 +256,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -271,7 +265,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records, hbaseIndex should tag them
- index.tagLocation(writeRecords, jsc, hoodieTable);
+ index.tagLocation(writeRecords, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and parallelism = 1
verify(table, times(3)).get((List<Get>) any());
@@ -287,7 +281,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -309,7 +303,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Get all the files generated
int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
- index.updateLocation(writeStatues, jsc, hoodieTable);
+ index.updateLocation(writeStatues, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
// so each fileId ideally gets updates
verify(table, atMost(numberOfDataFileIds)).put((List<Put>) any());
@@ -319,7 +313,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
public void testsHBasePutAccessParallelism() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
- final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+ final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
@@ -334,7 +328,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD =
- jsc.parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
+ jsc().parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
@@ -348,7 +342,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig(2);
@@ -358,7 +352,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
writeClient.startCommitWithTime(newCommitTime);
@@ -367,7 +361,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
@@ -375,7 +369,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -388,7 +382,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -398,7 +392,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert records
@@ -410,7 +404,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
@@ -425,12 +419,12 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
newWriteStatus.setStat(new HoodieWriteStat());
return newWriteStatus;
});
- JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable);
+ JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc(), hoodieTable);
assertEquals(deleteStatus.count(), deleteWriteStatues.count());
assertNoWriteErrors(deleteStatus.collect());
// Ensure no records can be tagged
- List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -456,7 +450,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
}
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
- return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 562563e..27db072 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -19,14 +19,26 @@
package org.apache.hudi.testutils;
+import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.providers.DFSProvider;
+import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
+import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
+import org.apache.hudi.testutils.providers.SparkProvider;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
@@ -35,8 +47,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
-public class FunctionalTestHarness implements SparkProvider, DFSProvider {
+public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
@@ -53,6 +70,10 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
@TempDir
protected java.nio.file.Path tempDir;
+ public String basePath() {
+ return tempDir.toAbsolutePath().toString();
+ }
+
@Override
public SparkSession spark() {
return spark;
@@ -83,15 +104,32 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
return dfs.getWorkingDirectory();
}
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
+ return getHoodieMetaClient(hadoopConf, basePath, new Properties());
+ }
+
+ @Override
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
+ props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString());
+ props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
+ props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name());
+ props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+ return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
+ }
+
+ @Override
+ public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
+ return new HoodieWriteClient(jsc, cfg, false, HoodieIndex.createIndex(cfg));
+ }
+
@BeforeEach
public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
- FileSystem.closeAll();
-
- spark = SparkSession.builder()
- .config(HoodieWriteClient.registerClasses(conf()))
- .getOrCreate();
+ SparkConf sparkConf = conf();
+ HoodieWriteClient.registerClasses(sparkConf);
+ HoodieReadClient.addHoodieSupport(sparkConf);
+ spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
similarity index 95%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
index 16cc471..62b48cb 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
similarity index 50%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 16cc471..0cd7ed5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -17,18 +17,24 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-public interface DFSProvider {
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
- MiniDFSCluster dfsCluster();
+import java.io.IOException;
+import java.util.Properties;
- DistributedFileSystem dfs();
+public interface HoodieMetaClientProvider {
- Path dfsBasePath();
+ HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException;
+ default HoodieTableFileSystemView getHoodieTableFileSystemView(
+ HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) {
+ return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+ }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
similarity index 73%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
index 16cc471..4840af0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
@@ -17,18 +17,14 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.config.HoodieWriteConfig;
-public interface DFSProvider {
+import java.io.IOException;
- MiniDFSCluster dfsCluster();
-
- DistributedFileSystem dfs();
-
- Path dfsBasePath();
+public interface HoodieWriteClientProvider {
+ HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException;
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
similarity index 97%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
index 948f736..cdf5ac4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;