You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/09/29 15:29:20 UTC
[hudi] branch master updated: [MINOR] Use base path URI in ITTestDataStreamWrite (#6826)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a65c3b798b [MINOR] Use base path URI in ITTestDataStreamWrite (#6826)
a65c3b798b is described below
commit a65c3b798bc94ab75993be0ea9bd17a00404fc6c
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Thu Sep 29 20:59:14 2022 +0530
[MINOR] Use base path URI in ITTestDataStreamWrite (#6826)
---
.../java/org/apache/hudi/sink/ITTestDataStreamWrite.java | 16 ++++++++--------
.../src/test/java/org/apache/hudi/utils/TestData.java | 4 ++--
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index e6d2ddb7b5..193c0abcd8 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -100,7 +100,7 @@ public class ITTestDataStreamWrite extends TestLogger {
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testWriteCopyOnWrite(String indexType) throws Exception {
- Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
@@ -146,7 +146,7 @@ public class ITTestDataStreamWrite extends TestLogger {
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception {
- Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
@@ -167,7 +167,7 @@ public class ITTestDataStreamWrite extends TestLogger {
}
private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
- Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
@@ -182,7 +182,7 @@ public class ITTestDataStreamWrite extends TestLogger {
Transformer transformer,
String jobName,
Map<String, List<String>> expected) throws Exception {
- testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()),
+ testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.toURI().toString()),
Option.of(transformer), jobName, 2, expected);
}
@@ -336,7 +336,7 @@ public class ITTestDataStreamWrite extends TestLogger {
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
@@ -345,10 +345,10 @@ public class ITTestDataStreamWrite extends TestLogger {
TestData.writeData(TestData.dataSetInsert(3, 4), conf);
TestData.writeData(TestData.dataSetInsert(5, 6), conf);
- String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+ String latestCommit = TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
Map<String, String> options = new HashMap<>();
- options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
//read a hoodie table use low-level source api.
@@ -378,7 +378,7 @@ public class ITTestDataStreamWrite extends TestLogger {
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index e7e34cc15a..8e1dd9964c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -626,8 +626,8 @@ public class TestData {
Map<String, List<String>> expected) throws IOException {
// 1. init flink table
- HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toURI().toString());
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.toURI().toString()).build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
// 2. check each partition data