You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/12 06:49:49 UTC
[hudi] branch master updated: [HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 994c561488 [HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)
994c561488 is described below
commit 994c56148859a8b7d6b6b5c068a7715167e9bb70
Author: HunterXHunter <13...@qq.com>
AuthorDate: Tue Jul 12 14:49:44 2022 +0800
[HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)
* [HUDI-4298] Add test case for reading mor table
Signed-off-by: LinMingQiang <13...@qq.com>
---
.../hudi/common/testutils/HoodieTestUtils.java | 4 +
.../apache/hudi/sink/ITTestDataStreamWrite.java | 6 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 6 +-
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 57 ++++++++
.../sink/compact/ITTestHoodieFlinkCompactor.java | 6 +-
.../org/apache/hudi/sink/utils/TestWriteBase.java | 19 +--
.../test/java/org/apache/hudi/utils/TestData.java | 146 +++++++++++++++------
7 files changed, 179 insertions(+), 65 deletions(-)
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 4e507436b9..6a2bffd34d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -66,6 +66,10 @@ public class HoodieTestUtils {
return init(getDefaultHadoopConf(), basePath, tableType);
}
+ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, Properties properties) throws IOException {
+ return init(getDefaultHadoopConf(), basePath, tableType, properties);
+ }
+
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 9ed0dfb807..3d96c1cafa 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -263,7 +263,7 @@ public class ITTestDataStreamWrite extends TestLogger {
client.getJobExecutionResult().get();
}
- TestData.checkWrittenFullData(tempFile, expected);
+ TestData.checkWrittenDataCOW(tempFile, expected);
}
private void testWriteToHoodieWithCluster(
@@ -327,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger {
// wait for the streaming job to finish
client.getJobExecutionResult().get();
- TestData.checkWrittenFullData(tempFile, expected);
+ TestData.checkWrittenDataCOW(tempFile, expected);
}
public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
@@ -449,7 +449,7 @@ public class ITTestDataStreamWrite extends TestLogger {
builder.sink(dataStream, false);
execute(execEnv, true, "Api_Sink_Test");
- TestData.checkWrittenFullData(tempFile, EXPECTED);
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index e67d2ab35c..9a6aaf01e6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -250,7 +250,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.assertNextEvent()
.checkpointComplete(2)
- .checkWrittenFullData(EXPECTED5)
+ .checkWrittenDataCOW(EXPECTED5)
.end();
}
@@ -282,7 +282,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.handleEvents(2)
.checkpointComplete(2)
- .checkWrittenFullData(EXPECTED5)
+ .checkWrittenDataCOW(EXPECTED5)
.end();
}
@@ -305,7 +305,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.handleEvents(1)
.checkpointComplete(2)
- .checkWrittenFullData(EXPECTED5)
+ .checkWrittenDataCOW(EXPECTED5)
.end();
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index aa31d859bb..1fe48e6700 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,12 +18,15 @@
package org.apache.hudi.sink;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
@@ -84,6 +87,60 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
validateIndexLoaded();
}
+ @Test
+ public void testEventTimeAvroPayloadMergeRead() throws Exception {
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.set(FlinkOptions.OPERATION, "upsert");
+ conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
+ conf.set(FlinkOptions.PRE_COMBINE, true);
+ conf.set(FlinkOptions.PRECOMBINE_FIELD, "ts");
+ conf.set(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
+ HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
+ mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, id2,par1,id2,Stephen,33,2,par1]");
+ TestHarness.instance().preparePipeline(tempFile, conf)
+ .consume(TestData.DATA_SET_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1, 4)
+ .consume(TestData.DATA_SET_DISORDER_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(mergedExpected, 4)
+ .consume(TestData.DATA_SET_SINGLE_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(3)
+ .assertNextEvent()
+ .checkpointComplete(3)
+ .checkWrittenData(mergedExpected, 4)
+ .end();
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testOnlyBaseFileOrOnlyLogFileRead(int compactionDeltaCommits) throws Exception {
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.set(FlinkOptions.OPERATION, "upsert");
+ conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits);
+ TestHarness.instance().preparePipeline(tempFile, conf)
+ .consume(TestData.DATA_SET_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1, 4)
+ .end();
+ }
+
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 341a157e86..7e3b055add 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -161,7 +161,7 @@ public class ITTestHoodieFlinkCompactor {
env.execute("flink_hudi_compaction");
writeClient.close();
- TestData.checkWrittenFullData(tempFile, EXPECTED1);
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}
@ParameterizedTest
@@ -202,7 +202,7 @@ public class ITTestHoodieFlinkCompactor {
asyncCompactionService.shutDown();
- TestData.checkWrittenFullData(tempFile, EXPECTED2);
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}
@ParameterizedTest
@@ -281,7 +281,7 @@ public class ITTestHoodieFlinkCompactor {
env.execute("flink_hudi_compaction");
writeClient.close();
- TestData.checkWrittenFullData(tempFile, EXPECTED3);
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
}
private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b752258219..b6ae0767d6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -22,20 +22,14 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
-import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
@@ -338,9 +332,7 @@ public class TestWriteBase {
public TestHarness checkWrittenData(
Map<String, String> expected,
int partitions) throws Exception {
- if (OptionsResolver.isCowTable(conf)
- || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
- || OptionsResolver.isAppendMode(conf)) {
+ if (OptionsResolver.isCowTable(conf)) {
TestData.checkWrittenData(this.baseFile, expected, partitions);
} else {
checkWrittenDataMor(baseFile, expected, partitions);
@@ -349,15 +341,12 @@ public class TestWriteBase {
}
private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
- HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf));
- Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
- String latestInstant = lastCompleteInstant();
FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
- TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
+ TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions);
}
- public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
- TestData.checkWrittenFullData(this.baseFile, expected);
+ public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
+ TestData.checkWrittenDataCOW(this.baseFile, expected);
return this;
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d03e1b2eb5..d0cf143318 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -18,13 +18,22 @@
package org.apache.hudi.utils;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -49,7 +58,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.Strings;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
@@ -60,15 +68,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.function.Predicate;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.function.Predicate;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static junit.framework.TestCase.assertEquals;
+import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_PROPERTIES_FILE;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -278,6 +290,17 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
+ );
+
public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
// DISORDER UPDATE
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
@@ -598,14 +621,14 @@ public class TestData {
* @param basePath The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
*/
- public static void checkWrittenFullData(
+ public static void checkWrittenDataCOW(
File basePath,
Map<String, List<String>> expected) throws IOException {
// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
- HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
// 2. check each partition data
expected.forEach((partition, partitionDataSet) -> {
@@ -638,49 +661,90 @@ public class TestData {
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
- * @param fs The file system
- * @param latestInstant The latest committed instant of current table
- * @param baseFile The file base to check, should be a directory
- * @param expected The expected results mapping, the key should be the partition path
- * @param partitions The expected partition number
- * @param schema The read schema
+ * @param fs The file system
+ * @param baseFile The file base to check, should be a directory
+ * @param expected The expected results mapping, the key should be the partition path
+ * @param partitions The expected partition number
*/
public static void checkWrittenDataMOR(
FileSystem fs,
- String latestInstant,
File baseFile,
Map<String, String> expected,
- int partitions,
- Schema schema) {
+ int partitions) throws Exception {
assert baseFile.isDirectory() : "Base path should be a directory";
- FileFilter partitionFilter = file -> !file.getName().startsWith(".");
- File[] partitionDirs = baseFile.listFiles(partitionFilter);
+ String basePath = baseFile.getAbsolutePath();
+ File hoodiePropertiesFile = new File(baseFile + "/" + METAFOLDER_NAME + "/" + HOODIE_PROPERTIES_FILE);
+ assert hoodiePropertiesFile.exists();
+ // 1. init flink table
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .fromFile(hoodiePropertiesFile)
+ .withPath(basePath).build();
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, config.getProps());
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
+ Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+
+ String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants()
+ .lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
+ assertNotNull(latestInstant, "No completed commit under table path" + basePath);
+
+ File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory());
assertNotNull(partitionDirs);
- assertThat(partitionDirs.length, is(partitions));
+ assertThat("The partitions number should be: " + partitions, partitionDirs.length, is(partitions));
+
+ // 2. check each partition data
+ final int[] requiredPos = IntStream.range(0, schema.getFields().size()).toArray();
for (File partitionDir : partitionDirs) {
- File[] dataFiles = partitionDir.listFiles(file ->
- file.getName().contains(".log.") && !file.getName().startsWith(".."));
- assertNotNull(dataFiles);
- List<String> logPaths = Arrays.stream(dataFiles)
- .sorted((f1, f2) -> HoodieLogFile.getLogFileComparator()
- .compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath())))
- .map(File::getAbsolutePath).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant);
- List<String> readBuffer = scanner.getRecords().values().stream()
- .map(hoodieRecord -> {
- try {
- // in case it is a delete
- GenericRecord record = (GenericRecord) hoodieRecord.getData()
- .getInsertValue(schema, new Properties())
- .orElse(null);
- return record == null ? (String) null : filterOutVariables(record);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ List<String> readBuffer = new ArrayList<>();
+ List<FileSlice> fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList());
+ for (FileSlice fileSlice : fileSlices) {
+ HoodieMergedLogRecordScanner scanner = null;
+ List<String> logPaths = fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList());
+ if (logPaths.size() > 0) {
+ scanner = getScanner(fs, basePath, logPaths, schema, latestInstant);
+ }
+ String baseFilePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ Set<String> keyToSkip = new HashSet<>();
+ if (baseFilePath != null) {
+ // read the base file first
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+ ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(baseFilePath)).build();
+ GenericRecord currentRecord = reader.read();
+ while (currentRecord != null) {
+ String curKey = currentRecord.get(HOODIE_RECORD_KEY_COL_POS).toString();
+ if (scanner != null && scanner.getRecords().containsKey(curKey)) {
+ keyToSkip.add(curKey);
+ // merge row with log.
+ final HoodieAvroRecord<?> record = (HoodieAvroRecord<?>) scanner.getRecords().get(curKey);
+ Option<IndexedRecord> combineResult = record.getData().combineAndGetUpdateValue(currentRecord, schema, config.getProps());
+ if (combineResult.isPresent()) {
+ GenericRecord avroRecord = buildAvroRecordBySchema(combineResult.get(), schema, requiredPos, recordBuilder);
+ readBuffer.add(filterOutVariables(avroRecord));
+ }
+ } else {
+ readBuffer.add(filterOutVariables(currentRecord));
}
- })
- .filter(Objects::nonNull)
- .sorted(Comparator.naturalOrder())
- .collect(Collectors.toList());
+ currentRecord = reader.read();
+ }
+ }
+ // read the remaining log data.
+ if (scanner != null) {
+ for (String curKey : scanner.getRecords().keySet()) {
+ if (!keyToSkip.contains(curKey)) {
+ Option<GenericRecord> record = (Option<GenericRecord>) scanner.getRecords()
+ .get(curKey).getData()
+ .getInsertValue(schema, config.getProps());
+ if (record.isPresent()) {
+ readBuffer.add(filterOutVariables(record.get()));
+ }
+ }
+ }
+ }
+ }
+ // Ensure that to write and read sequences are consistent.
+ readBuffer.sort(String::compareTo);
assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
}
}
@@ -722,7 +786,7 @@ public class TestData {
fields.add(genericRecord.get("age").toString());
fields.add(genericRecord.get("ts").toString());
fields.add(genericRecord.get("partition").toString());
- return Strings.join(fields, ",");
+ return String.join(",",fields);
}
public static BinaryRowData insertRow(Object... fields) {