You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/21 04:36:55 UTC
[hudi] branch master updated: [HUDI-2583] Refactor
TestWriteCopyOnWrite test cases (#3832)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aa3c4ec [HUDI-2583] Refactor TestWriteCopyOnWrite test cases (#3832)
aa3c4ec is described below
commit aa3c4ecda57c412b5822348d40cdf983c0035fb7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Oct 21 12:36:41 2021 +0800
[HUDI-2583] Refactor TestWriteCopyOnWrite test cases (#3832)
---
.../sink/partitioner/profile/WriteProfile.java | 8 -
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 964 +++++----------------
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 35 -
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 6 -
.../hudi/sink/utils/InsertFunctionWrapper.java | 7 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 2 +-
.../hudi/sink/utils/TestFunctionWrapper.java | 124 +++
.../org/apache/hudi/sink/utils/TestWriteBase.java | 425 +++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 11 +-
9 files changed, 776 insertions(+), 806 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index d3de247..1171a54 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -31,11 +31,9 @@ import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SmallFile;
-import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
-import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,11 +97,6 @@ public class WriteProfile {
protected AbstractTableFileSystemView fsView;
/**
- * Hadoop configuration.
- */
- private final Configuration hadoopConf;
-
- /**
* Metadata cache to reduce IO of metadata files.
*/
private final Map<String, HoodieCommitMetadata> metadataCache;
@@ -114,7 +107,6 @@ public class WriteProfile {
this.smallFilesMap = new HashMap<>();
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
this.table = HoodieFlinkTable.create(config, context);
- this.hadoopConf = StreamerUtil.getHadoopConf();
this.metadataCache = new HashMap<>();
// profile the record statistics on construction
recordProfile();
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 5b25311..d8588f8 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,93 +19,44 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.InsertFunctionWrapper;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
-import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.table.data.RowData;
-import org.hamcrest.MatcherAssert;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for stream write.
*/
-public class TestWriteCopyOnWrite {
-
- protected static final Map<String, String> EXPECTED1 = new HashMap<>();
-
- protected static final Map<String, String> EXPECTED2 = new HashMap<>();
-
- protected static final Map<String, String> EXPECTED3 = new HashMap<>();
-
- static {
- EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
- EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
- EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
- EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
-
- EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
- EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
- EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
- + "id9,par3,id9,Jane,19,6,par3]");
- EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
- + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
-
- EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
- }
+public class TestWriteCopyOnWrite extends TestWriteBase {
protected Configuration conf;
- protected StreamWriteFunctionWrapper<RowData> funcWrapper;
-
@TempDir
File tempFile;
@BeforeEach
- public void before() throws Exception {
+ public void before() {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
setUp(conf);
- this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
}
/**
@@ -115,353 +66,148 @@ public class TestWriteCopyOnWrite {
// for sub-class extension
}
- @AfterEach
- public void after() throws Exception {
- funcWrapper.close();
- }
-
@Test
public void testCheckpoint() throws Exception {
- // open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- // no checkpoint, so the coordinator does not accept any events
- assertTrue(
- funcWrapper.getEventBuffer().length == 1
- && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- String instant = lastPendingInstant();
-
- final OperatorEvent nextEvent = funcWrapper.getNextEvent();
- MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
- List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
- assertNotNull(writeStatuses);
- MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
- assertThat(writeStatuses.stream()
- .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
- .collect(Collectors.joining(",")),
- is("par1,par2,par3,par4"));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- checkInstantState(REQUESTED, instant);
- funcWrapper.checkpointComplete(1);
- // the coordinator checkpoint commits the inflight instant.
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // checkpoint for next round, no data input, so after the checkpoint,
- // there should not be REQUESTED Instant
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(2);
-
- String instant2 = lastPendingInstant();
- assertNotEquals(instant, instant2);
-
- final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class));
- List<WriteStatus> writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses();
- assertNotNull(writeStatuses2);
- assertThat(writeStatuses2.size(), is(0)); // write empty statuses
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- funcWrapper.checkpointComplete(2);
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ // no checkpoint, so the coordinator does not accept any events
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent(4, "par1,par2,par3,par4")
+ .checkpointComplete(1)
+ // checkpoint for next round, no data input, so after the checkpoint,
+ // there should not be REQUESTED Instant
+ // this triggers the data write and event send
+ .checkpoint(2)
+ .assertEmptyEvent()
+ .emptyCheckpoint(2)
+ .end();
}
@Test
public void testCheckpointFails() throws Exception {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
- // open the function and ingest data
- funcWrapper.openFunction();
- // no data written and triggers checkpoint fails,
- // then we should revert the start instant
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- String instant = lastPendingInstant();
- assertNotNull(instant);
-
- final OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
- List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
- assertNotNull(writeStatuses);
- assertThat(writeStatuses.size(), is(0)); // no data write
-
- // fails the checkpoint
- funcWrapper.checkpointFails(1);
- assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(),
- "The last checkpoint was aborted, ignore the events");
-
- // the instant metadata should be reused
- checkInstantState(REQUESTED, instant);
- checkInstantState(HoodieInstant.State.COMPLETED, null);
-
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- // this returns early because there is no inflight instant
- assertDoesNotThrow(() -> funcWrapper.checkpointFunction(2),
- "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
- // do not send the write event and fails the checkpoint,
- // behaves like the last checkpoint is successful.
- funcWrapper.checkpointFails(2);
+ preparePipeline(conf)
+ // no data written and triggers checkpoint fails,
+ // then we should revert the start instant
+ .checkpoint(1)
+ .assertEmptyEvent()
+ .checkpointFails(1)
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpointNotThrow(2,
+ "The stream writer reuse the last instant time when waiting for the last instant commit timeout")
+ // do not send the write event and fails the checkpoint,
+ // behaves like the last checkpoint is successful.
+ .checkpointFails(2)
+ .end();
}
@Test
public void testSubtaskFails() throws Exception {
// open the function and ingest data
- funcWrapper.openFunction();
- // no data written and triggers checkpoint fails,
- // then we should revert the start instant
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- funcWrapper.getNextEvent();
-
- String instant1 = lastPendingInstant();
- assertNotNull(instant1);
-
- // fails the subtask
- funcWrapper.subTaskFails(0);
-
- String instant2 = lastPendingInstant();
- assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
-
- checkInstantState(HoodieInstant.State.COMPLETED, null);
+ preparePipeline()
+ .checkpoint(1)
+ .assertEmptyEvent()
+ .subTaskFails(0)
+ .noCompleteInstant()
+ .end();
}
@Test
public void testInsert() throws Exception {
// open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- assertEmptyDataFiles();
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- String instant = lastPendingInstant();
-
- final OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- checkInstantState(REQUESTED, instant);
- funcWrapper.checkpointComplete(1);
- checkWrittenData(tempFile, EXPECTED1);
- // the coordinator checkpoint commits the inflight instant.
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
- checkWrittenData(tempFile, EXPECTED1);
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1)
+ .end();
}
@Test
public void testInsertDuplicates() throws Exception {
// reset the config option
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
-
- assertEmptyDataFiles();
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- funcWrapper.checkpointComplete(1);
-
- checkWrittenData(tempFile, EXPECTED3, 1);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
-
- funcWrapper.checkpointFunction(2);
-
- nextEvent = funcWrapper.getNextEvent();
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- funcWrapper.checkpointComplete(2);
-
- checkWrittenData(tempFile, EXPECTED3, 1);
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1)
+ // insert duplicates again
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED3, 1)
+ .end();
}
@Test
public void testUpsert() throws Exception {
// open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- assertEmptyDataFiles();
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- funcWrapper.checkpointComplete(1);
-
- // upsert another data buffer
- for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
- funcWrapper.invoke(rowData);
- }
- // the data is not flushed yet
- checkWrittenData(tempFile, EXPECTED1);
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(2);
-
- String instant = lastPendingInstant();
-
- nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- checkInstantState(REQUESTED, instant);
- funcWrapper.checkpointComplete(2);
- // the coordinator checkpoint commits the inflight instant.
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
- checkWrittenData(tempFile, EXPECTED2);
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ // upsert another data buffer
+ .consume(TestData.DATA_SET_UPDATE_INSERT)
+ // the data is not flushed yet
+ .checkWrittenData(EXPECTED1)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED2)
+ .end();
}
@Test
public void testUpsertWithDelete() throws Exception {
// open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- assertEmptyDataFiles();
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- funcWrapper.checkpointComplete(1);
-
- // upsert another data buffer
- for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
- funcWrapper.invoke(rowData);
- }
- // the data is not flushed yet
- checkWrittenData(tempFile, EXPECTED1);
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(2);
-
- String instant = lastPendingInstant();
-
- nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- checkInstantState(REQUESTED, instant);
- funcWrapper.checkpointComplete(2);
- // the coordinator checkpoint commits the inflight instant.
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- Map<String, String> expected = getUpsertWithDeleteExpected();
- checkWrittenData(tempFile, expected);
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .consume(TestData.DATA_SET_UPDATE_DELETE)
+ .checkWrittenData(EXPECTED1)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(getUpsertWithDeleteExpected())
+ .end();
}
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
- // so 3 records expect to trigger a mini-batch write
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
-
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
- dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
- final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event2 = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- String instant = lastPendingInstant();
-
- funcWrapper.checkpointComplete(1);
Map<String, String> expected = getMiniBatchExpected();
- checkWrittenData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
- funcWrapper.checkpointFunction(2);
-
- final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event4 = funcWrapper.getNextEvent();
- funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
- funcWrapper.checkpointComplete(2);
-
- // Same the original base file content.
- checkWrittenData(tempFile, expected, 1);
+ preparePipeline(conf)
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertDataBuffer(1, 2)
+ .checkpoint(1)
+ .allDataFlushed()
+ .handleEvents(2)
+ .checkpointComplete(1)
+ .checkWrittenData(expected, 1)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .checkpoint(2)
+ .handleEvents(2)
+ .checkpointComplete(2)
+ .checkWrittenData(expected, 1)
+ .end();
}
@Test
@@ -469,129 +215,43 @@ public class TestWriteCopyOnWrite {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
- // so 3 records expect to trigger a mini-batch write
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
- dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
- final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event2 = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- String instant = lastPendingInstant();
-
- funcWrapper.checkpointComplete(1);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
- checkWrittenData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- funcWrapper.checkpointFunction(2);
-
- final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event4 = funcWrapper.getNextEvent();
- funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
- funcWrapper.checkpointComplete(2);
-
- // Same the original base file content.
- checkWrittenData(tempFile, expected, 1);
+ preparePipeline(conf)
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .assertDataBuffer(1, 2)
+ .checkpoint(1)
+ .allDataFlushed()
+ .handleEvents(2)
+ .checkpointComplete(1)
+ .checkWrittenData(expected, 1)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(2)
+ .handleEvents(2)
+ .checkpointComplete(2)
+ .checkWrittenData(expected, 1)
+ .end();
}
@Test
public void testInsertAppendMode() throws Exception {
- InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- assertNull(funcWrapper.getWriterHelper());
-
- final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
- assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- String instant = lastPendingInstant();
-
- funcWrapper.checkpointComplete(1);
-
- Map<String, String> expected = new HashMap<>();
-
- expected.put("par1", "["
- + "id1,par1,id1,Danny,23,0,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,2,par1, "
- + "id1,par1,id1,Danny,23,3,par1, "
- + "id1,par1,id1,Danny,23,4,par1]");
-
- TestData.checkWrittenAllData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- funcWrapper.checkpointFunction(2);
-
- final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first
- funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
- funcWrapper.checkpointComplete(2);
-
- // same with the original base file content.
- expected.put("par1", "["
- + "id1,par1,id1,Danny,23,0,par1, "
- + "id1,par1,id1,Danny,23,0,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,2,par1, "
- + "id1,par1,id1,Danny,23,2,par1, "
- + "id1,par1,id1,Danny,23,3,par1, "
- + "id1,par1,id1,Danny,23,3,par1, "
- + "id1,par1,id1,Danny,23,4,par1, "
- + "id1,par1,id1,Danny,23,4,par1]");
- TestData.checkWrittenAllData(tempFile, expected, 1);
+ prepareInsertPipeline()
+ // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenAllData(EXPECTED4, 1)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenFullData(EXPECTED5)
+ .end();
}
/**
@@ -604,145 +264,54 @@ public class TestWriteCopyOnWrite {
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
- // open the function and ingest data
- funcWrapper.openFunction();
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
- // so 3 records expect to trigger a mini-batch write
- // flush the max size bucket once at a time.
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
- dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
- for (int i = 0; i < 2; i++) {
- final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
- assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
- funcWrapper.getCoordinator().handleEventFromOperator(0, event);
- }
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- String instant = lastPendingInstant();
-
- funcWrapper.checkpointComplete(1);
-
- Map<String, String> expected = new HashMap<>();
-
- expected.put("par1", "["
- + "id1,par1,id1,Danny,23,0,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,2,par1, "
- + "id1,par1,id1,Danny,23,3,par1, "
- + "id1,par1,id1,Danny,23,4,par1]");
- TestData.checkWrittenData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- funcWrapper.checkpointFunction(2);
-
- for (int i = 0; i < 2; i++) {
- final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
- funcWrapper.getCoordinator().handleEventFromOperator(0, event);
- }
-
- funcWrapper.checkpointComplete(2);
-
- // same with the original base file content.
- Map<String, List<String>> expected2 = new HashMap<>();
- expected2.put("par1", Arrays.asList(
- "id1,par1,id1,Danny,23,0,par1",
- "id1,par1,id1,Danny,23,0,par1",
- "id1,par1,id1,Danny,23,1,par1",
- "id1,par1,id1,Danny,23,1,par1",
- "id1,par1,id1,Danny,23,2,par1",
- "id1,par1,id1,Danny,23,2,par1",
- "id1,par1,id1,Danny,23,3,par1",
- "id1,par1,id1,Danny,23,3,par1",
- "id1,par1,id1,Danny,23,4,par1",
- "id1,par1,id1,Danny,23,4,par1"));
-
- // Same the original base file content.
- TestData.checkWrittenFullData(tempFile, expected2);
+ TestWriteMergeOnRead.TestHarness.instance()
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
+ // flush the max size bucket once at a time.
+ .preparePipeline(tempFile, conf)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .assertDataBuffer(1, 2)
+ .checkpoint(1)
+ .allDataFlushed()
+ .handleEvents(2)
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED4, 1)
+ // insert duplicates again
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(2)
+ .handleEvents(2)
+ .checkpointComplete(2)
+ .checkWrittenFullData(EXPECTED5)
+ .end();
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
- // so 3 records expect to trigger a mini-batch write
- // flush the max size bucket once at a time.
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
-
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
- dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
- for (int i = 0; i < 2; i++) {
- final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
- assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
- funcWrapper.getCoordinator().handleEventFromOperator(0, event);
- }
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- String instant = lastPendingInstant();
-
- funcWrapper.checkpointComplete(1);
Map<String, String> expected = getMiniBatchExpected();
- checkWrittenData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant();
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
- funcWrapper.invoke(rowData);
- }
- funcWrapper.checkpointFunction(2);
-
- for (int i = 0; i < 2; i++) {
- final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
- funcWrapper.getCoordinator().handleEventFromOperator(0, event);
- }
-
- funcWrapper.checkpointComplete(2);
-
- // Same the original base file content.
- checkWrittenData(tempFile, expected, 1);
+ preparePipeline(conf)
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
+ // flush the max size bucket once at a time.
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertDataBuffer(1, 2)
+ .checkpoint(1)
+ .allDataFlushed()
+ .handleEvents(2)
+ .checkpointComplete(1)
+ .checkWrittenData(expected, 1)
+ // insert duplicates again
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .checkpoint(2)
+ .handleEvents(2)
+ .checkpointComplete(2)
+ // Same the original base file content.
+ .checkWrittenData(expected, 1)
+ .end();
}
protected Map<String, String> getMiniBatchExpected() {
@@ -772,71 +341,38 @@ public class TestWriteCopyOnWrite {
@Test
public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- assertEmptyDataFiles();
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- OperatorEvent nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- funcWrapper.checkpointComplete(1);
-
- // the data is not flushed yet
- checkWrittenData(tempFile, EXPECTED1);
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1, 4)
+ .end();
// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // upsert another data buffer
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- checkIndexLoaded(
- new HoodieKey("id1", "par1"),
- new HoodieKey("id2", "par1"),
- new HoodieKey("id3", "par2"),
- new HoodieKey("id4", "par2"),
- new HoodieKey("id5", "par3"),
- new HoodieKey("id6", "par3"),
- new HoodieKey("id7", "par4"),
- new HoodieKey("id8", "par4"),
- new HoodieKey("id9", "par3"),
- new HoodieKey("id10", "par4"),
- new HoodieKey("id11", "par4"));
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
-
- assertTrue(funcWrapper.isAlreadyBootstrap());
-
- String instant = lastPendingInstant();
-
- nextEvent = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
- Map<String, String> expected = getExpectedBeforeCheckpointComplete();
- checkWrittenData(tempFile, expected);
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
- checkInstantState(REQUESTED, instant);
-
- funcWrapper.checkpointComplete(1);
- // the coordinator checkpoint commits the inflight instant.
- checkInstantState(HoodieInstant.State.COMPLETED, instant);
- checkWrittenData(tempFile, EXPECTED2);
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_UPDATE_INSERT)
+ .checkIndexLoaded(
+ new HoodieKey("id1", "par1"),
+ new HoodieKey("id2", "par1"),
+ new HoodieKey("id3", "par2"),
+ new HoodieKey("id4", "par2"),
+ new HoodieKey("id5", "par3"),
+ new HoodieKey("id6", "par3"),
+ new HoodieKey("id7", "par4"),
+ new HoodieKey("id8", "par4"),
+ new HoodieKey("id9", "par3"),
+ new HoodieKey("id10", "par4"),
+ new HoodieKey("id11", "par4"))
+ .checkpoint(1)
+ .assertBootstrapped()
+ .assertNextEvent()
+ .checkWrittenData(getExpectedBeforeCheckpointComplete())
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED2)
+ .end();
}
@Test
@@ -844,46 +380,18 @@ public class TestWriteCopyOnWrite {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
- // open the function and ingest data
-
- funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- // no checkpoint, so the coordinator does not accept any events
- assertTrue(
- funcWrapper.getEventBuffer().length == 1
- && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit");
-
- for (int i = 0; i < 4; i++) {
- final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
- assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
- funcWrapper.getCoordinator().handleEventFromOperator(0, event);
- }
-
- funcWrapper.checkpointComplete(1);
-
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
-
- assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit");
-
- // checkpoint for the next round
- funcWrapper.checkpointFunction(2);
-
- assertDoesNotThrow(() -> {
- for (RowData rowData : TestData.DATA_SET_INSERT) {
- funcWrapper.invoke(rowData);
- }
- }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertConfirming()
+ .handleEvents(4)
+ .checkpointComplete(1)
+ .consume(TestData.DATA_SET_INSERT)
+ .assertNotConfirming()
+ .checkpoint(2)
+ .assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT)
+ .end();
}
@Test
@@ -903,59 +411,19 @@ public class TestWriteCopyOnWrite {
// Utilities
// -------------------------------------------------------------------------
- private void checkInflightInstant() {
- final String instant = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
- assertNotNull(instant);
- }
-
- private void checkInstantState(HoodieInstant.State state, String instantStr) {
- final String instant;
- switch (state) {
- case REQUESTED:
- instant = lastPendingInstant();
- break;
- case COMPLETED:
- instant = lastCompleteInstant();
- break;
- default:
- throw new AssertionError("Unexpected state");
- }
- assertThat(instant, is(instantStr));
+ private TestHarness preparePipeline() throws Exception {
+ return TestHarness.instance().preparePipeline(tempFile, conf);
}
- protected String lastPendingInstant() {
- return TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
+ private TestHarness preparePipeline(Configuration conf) throws Exception {
+ return TestHarness.instance().preparePipeline(tempFile, conf);
}
- protected String lastCompleteInstant() {
- return TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+ protected TestHarness prepareInsertPipeline() throws Exception {
+ return TestHarness.instance().preparePipeline(tempFile, conf, true);
}
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
-
- protected void checkWrittenData(File baseFile, Map<String, String> expected) throws Exception {
- checkWrittenData(baseFile, expected, 4);
- }
-
- protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
- TestData.checkWrittenData(baseFile, expected, partitions);
- }
-
- /**
- * Asserts the data files are empty.
- */
- protected void assertEmptyDataFiles() {
- File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith("."));
- assertNotNull(dataFiles);
- assertThat(dataFiles.length, is(0));
- }
-
- private void checkIndexLoaded(HoodieKey... keys) {
- for (HoodieKey key : keys) {
- assertTrue(funcWrapper.isKeyInState(key),
- "Key: " + key + " assumes to be in the index state");
- }
- }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 064857a..a35a0ac 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,25 +18,11 @@
package org.apache.hudi.sink;
-import org.apache.hudi.client.FlinkTaskContextSupplier;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.utils.TestData;
-import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.jupiter.api.BeforeEach;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
@@ -44,19 +30,6 @@ import java.util.Map;
* Test cases for delta stream write.
*/
public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
- private FileSystem fs;
- private HoodieWriteConfig writeConfig;
- private HoodieFlinkEngineContext context;
-
- @BeforeEach
- public void before() throws Exception {
- super.before();
- fs = FSUtils.getFs(tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration());
- writeConfig = StreamerUtil.getHoodieClientConfig(conf);
- context = new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(null));
- }
@Override
protected void setUp(Configuration conf) {
@@ -69,14 +42,6 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
}
@Override
- protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
- HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
- Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
- String latestInstant = lastCompleteInstant();
- TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
- }
-
- @Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7530c89..704d94c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
@@ -59,9 +58,4 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
-
- @Override
- protected String lastCompleteInstant() {
- return TestUtils.getLastDeltaCompleteInstant(tempFile.getAbsolutePath());
- }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 4dc197c..642a407 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -47,7 +47,7 @@ import java.util.concurrent.CompletableFuture;
*
* @param <I> Input type
*/
-public class InsertFunctionWrapper<I> {
+public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
private final Configuration conf;
private final RowType rowType;
@@ -115,6 +115,11 @@ public class InsertFunctionWrapper<I> {
return coordinator;
}
+ @Override
+ public void close() throws Exception {
+ this.coordinator.close();
+ }
+
public BulkInsertWriterHelper getWriterHelper() {
return this.writeFunction.getWriterHelper();
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65224a..54a142a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -66,7 +66,7 @@ import java.util.concurrent.CompletableFuture;
*
* @param <I> Input type
*/
-public class StreamWriteFunctionWrapper<I> {
+public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
private final Configuration conf;
private final IOManager ioManager;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
new file mode 100644
index 0000000..d2fe819
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Define the common interfaces for test function wrappers.
+ */
+public interface TestFunctionWrapper<I> {
+ /**
+ * Open all the functions within this wrapper.
+ */
+ void openFunction() throws Exception;
+
+ /**
+ * Process the given input record {@code record}.
+ */
+ void invoke(I record) throws Exception;
+
+ /**
+ * Returns the event buffer sent by the write tasks.
+ */
+ WriteMetadataEvent[] getEventBuffer();
+
+ /**
+ * Returns the next event.
+ */
+ OperatorEvent getNextEvent();
+
+ /**
+ * Snapshot all the functions in the wrapper.
+ */
+ void checkpointFunction(long checkpointId) throws Exception;
+
+ /**
+ * Mark checkpoint with id {code checkpointId} as success.
+ */
+ void checkpointComplete(long checkpointId);
+
+ /**
+ * Returns the operator coordinator.
+ */
+ StreamWriteOperatorCoordinator getCoordinator();
+
+ /**
+ * Returns the data buffer of the write task.
+ */
+ default Map<String, List<HoodieRecord>> getDataBuffer() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Mark checkpoint with id {code checkpointId} as failed.
+ */
+ default void checkpointFails(long checkpointId) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the context of the coordinator.
+ */
+ default MockOperatorCoordinatorContext getCoordinatorContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Mark sub-task with id {@code taskId} as failed.
+ */
+ default void subTaskFails(int taskId) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns whether the given key {@code key} is in the state store.
+ */
+ default boolean isKeyInState(HoodieKey key) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns whether the bootstrap function already bootstrapped.
+ */
+ default boolean isAlreadyBootstrap() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns whether the write task is confirming.
+ */
+ default boolean isConforming() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Close this function wrapper.
+ */
+ void close() throws Exception;
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
new file mode 100644
index 0000000..e3b1226
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.hamcrest.MatcherAssert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for write test cases.
+ */
+public class TestWriteBase {
+ protected static final Map<String, String> EXPECTED1 = new HashMap<>();
+
+ protected static final Map<String, String> EXPECTED2 = new HashMap<>();
+
+ protected static final Map<String, String> EXPECTED3 = new HashMap<>();
+
+ protected static final Map<String, String> EXPECTED4 = new HashMap<>();
+
+ protected static final Map<String, List<String>> EXPECTED5 = new HashMap<>();
+
+ static {
+ EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
+ EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
+ EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
+ EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+ EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+ EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
+ EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
+ + "id9,par3,id9,Jane,19,6,par3]");
+ EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
+ + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+ EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+
+ EXPECTED4.put("par1", "["
+ + "id1,par1,id1,Danny,23,0,par1, "
+ + "id1,par1,id1,Danny,23,1,par1, "
+ + "id1,par1,id1,Danny,23,2,par1, "
+ + "id1,par1,id1,Danny,23,3,par1, "
+ + "id1,par1,id1,Danny,23,4,par1]");
+
+ EXPECTED5.put("par1", Arrays.asList(
+ "id1,par1,id1,Danny,23,0,par1",
+ "id1,par1,id1,Danny,23,0,par1",
+ "id1,par1,id1,Danny,23,1,par1",
+ "id1,par1,id1,Danny,23,1,par1",
+ "id1,par1,id1,Danny,23,2,par1",
+ "id1,par1,id1,Danny,23,2,par1",
+ "id1,par1,id1,Danny,23,3,par1",
+ "id1,par1,id1,Danny,23,3,par1",
+ "id1,par1,id1,Danny,23,4,par1",
+ "id1,par1,id1,Danny,23,4,par1"));
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Utils to composite the test stages.
+ */
+ public static class TestHarness {
+ public static TestHarness instance() {
+ return new TestHarness();
+ }
+
+ private File baseFile;
+ private String basePath;
+ private Configuration conf;
+ private TestFunctionWrapper<RowData> pipeline;
+
+ private String lastPending;
+ private String lastComplete;
+
+ public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
+ preparePipeline(basePath, conf, false);
+ return this;
+ }
+
+ public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception {
+ this.baseFile = basePath;
+ this.basePath = this.baseFile.getAbsolutePath();
+ this.conf = conf;
+ this.pipeline = append
+ ? new InsertFunctionWrapper<>(this.basePath, conf)
+ : new StreamWriteFunctionWrapper<>(this.basePath, conf);
+ // open the function and ingest data
+ this.pipeline.openFunction();
+ return this;
+ }
+
+ public TestHarness consume(List<RowData> inputs) throws Exception {
+ for (RowData rowData : inputs) {
+ this.pipeline.invoke(rowData);
+ }
+ return this;
+ }
+
+ public TestHarness assertConsumeDoesNotThrow(List<RowData> inputs) {
+ assertDoesNotThrow(() -> {
+ consume(inputs);
+ }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
+ return this;
+ }
+
+ /**
+ * Assert the event buffer is empty.
+ */
+ public TestHarness emptyEventBuffer() {
+ assertTrue(
+ this.pipeline.getEventBuffer().length == 1
+ && this.pipeline.getEventBuffer()[0] == null,
+ "The coordinator events buffer expect to be empty");
+ return this;
+ }
+
+ /**
+ * Assert the next event exists and handle over it to the coordinator.
+ */
+ public TestHarness assertNextEvent() {
+ final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+ MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+ this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+ return this;
+ }
+
+ /**
+ * Assert the next event exists and handle over it to the coordinator.
+ *
+ * @param numWriteStatus The expected write status num reported by the event
+ * @param partitions The written partitions reported by the event
+ */
+ public TestHarness assertNextEvent(int numWriteStatus, String partitions) {
+ final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+ MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+ List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
+ assertNotNull(writeStatuses);
+ MatcherAssert.assertThat(writeStatuses.size(), is(numWriteStatus));
+ assertThat(writeStatuses.stream()
+ .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
+ .collect(Collectors.joining(",")),
+ is(partitions));
+ this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+ return this;
+ }
+
+ /**
+ * Assert the next event exists and handle over it to the coordinator.
+ *
+ * <p>Validates that the write metadata reported by the event is empty.
+ */
+ public TestHarness assertEmptyEvent() {
+ final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+ MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+ List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
+ assertNotNull(writeStatuses);
+ MatcherAssert.assertThat(writeStatuses.size(), is(0));
+ this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+ return this;
+ }
+
+ /**
+ * Assert the data buffer with given number of buckets and records.
+ */
+ public TestHarness assertDataBuffer(int numBuckets, int numRecords) {
+ Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
+ assertThat("Should have " + numBuckets + " data bucket", dataBuffer.size(), is(numBuckets));
+ assertThat(numRecords + " records expect to flush out as a mini-batch",
+ dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+ is(numRecords));
+ return this;
+ }
+
+ /**
+ * Checkpoints the pipeline, which triggers the data write and event send.
+ */
+ public TestHarness checkpoint(long checkpointId) throws Exception {
+ this.pipeline.checkpointFunction(checkpointId);
+ return this;
+ }
+
+ public TestHarness allDataFlushed() {
+ Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
+ assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+ return this;
+ }
+
+ /**
+ * Handle the next {@code numEvents} events and handle over them to the coordinator.
+ */
+ public TestHarness handleEvents(int numEvents) {
+ for (int i = 0; i < numEvents; i++) {
+ final OperatorEvent event = this.pipeline.getNextEvent(); // remove the first event first
+ assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
+ this.pipeline.getCoordinator().handleEventFromOperator(0, event);
+ }
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+ return this;
+ }
+
+ /**
+ * Mark the checkpoint with id {@code checkpointId} as finished.
+ */
+ public TestHarness checkpointComplete(long checkpointId) {
+ this.lastPending = lastPendingInstant();
+ this.pipeline.checkpointComplete(checkpointId);
+ // started a new instant already
+ checkInflightInstant();
+ checkInstantState(HoodieInstant.State.COMPLETED, lastPending);
+ this.lastComplete = lastPending;
+ this.lastPending = lastPendingInstant(); // refresh last pending instant
+ return this;
+ }
+
+ /**
+ * Mark the checkpoint finished with empty write metadata.
+ */
+ public TestHarness emptyCheckpoint(long checkpointId) {
+ String lastPending = lastPendingInstant();
+ this.pipeline.checkpointComplete(checkpointId);
+ // last pending instant was reused
+ assertEquals(this.lastPending, lastPending);
+ checkInstantState(HoodieInstant.State.COMPLETED, lastComplete);
+ return this;
+ }
+
+ /**
+ * Mark the checkpoint with id {@code checkpointId} as failed.
+ */
+ public TestHarness checkpointFails(long checkpointId) {
+ this.pipeline.checkpointFails(checkpointId);
+ assertFalse(this.pipeline.getCoordinatorContext().isJobFailed(),
+ "The last checkpoint was aborted, ignore the events");
+ // no complete instant
+ checkInstantState(HoodieInstant.State.COMPLETED, null);
+ return this;
+ }
+
+ public TestHarness checkpointNotThrow(long checkpointId, String message) {
+ // this returns early because there is no inflight instant
+ assertDoesNotThrow(() -> checkpoint(checkpointId), message);
+ return this;
+ }
+
+ /**
+ * Mark the task with id {@code taskId} as failed.
+ */
+ public TestHarness subTaskFails(int taskId) throws Exception {
+ // fails the subtask
+ String instant1 = lastPendingInstant();
+ this.pipeline.subTaskFails(taskId);
+
+ String instant2 = lastPendingInstant();
+ assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
+ return this;
+ }
+
+ public TestHarness noCompleteInstant() {
+ // no complete instant
+ checkInstantState(HoodieInstant.State.COMPLETED, null);
+ return this;
+ }
+
+ /**
+ * Asserts the data files are empty.
+ */
+ public TestHarness assertEmptyDataFiles() {
+ File[] dataFiles = baseFile.listFiles(file -> !file.getName().startsWith("."));
+ assertNotNull(dataFiles);
+ assertThat(dataFiles.length, is(0));
+ return this;
+ }
+
+ public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
+ checkWrittenData(expected, 4);
+ return this;
+ }
+
+ public TestHarness checkWrittenData(
+ Map<String, String> expected,
+ int partitions) throws Exception {
+ if (OptionsResolver.isCowTable(conf) || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
+ TestData.checkWrittenData(this.baseFile, expected, partitions);
+ } else {
+ checkWrittenDataMor(baseFile, expected, partitions);
+ }
+ return this;
+ }
+
+ private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath);
+ Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+ String latestInstant = lastCompleteInstant();
+ FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
+ TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
+ }
+
+ public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
+ TestData.checkWrittenFullData(this.baseFile, expected);
+ return this;
+ }
+
+ public TestHarness checkWrittenAllData(Map<String, String> expected, int partitions) throws IOException {
+ TestData.checkWrittenAllData(baseFile, expected, partitions);
+ return this;
+ }
+
+ public TestHarness checkIndexLoaded(HoodieKey... keys) {
+ for (HoodieKey key : keys) {
+ assertTrue(this.pipeline.isKeyInState(key),
+ "Key: " + key + " assumes to be in the index state");
+ }
+ return this;
+ }
+
+ public TestHarness assertBootstrapped() throws Exception {
+ assertTrue(this.pipeline.isAlreadyBootstrap());
+ return this;
+ }
+
+ public TestHarness assertConfirming() {
+ assertTrue(this.pipeline.isConforming(),
+ "The write function should be waiting for the instant to commit");
+ return this;
+ }
+
+ public TestHarness assertNotConfirming() {
+ assertFalse(this.pipeline.isConforming(),
+ "The write function should finish waiting for the instant to commit");
+ return this;
+ }
+
+ public void end() throws Exception {
+ this.pipeline.close();
+ }
+
+ private String lastPendingInstant() {
+ return TestUtils.getLastPendingInstant(basePath);
+ }
+
+ private void checkInflightInstant() {
+ final String instant = TestUtils.getLastPendingInstant(basePath);
+ assertNotNull(instant);
+ }
+
+ private void checkInstantState(HoodieInstant.State state, String instantStr) {
+ final String instant;
+ switch (state) {
+ case REQUESTED:
+ instant = lastPendingInstant();
+ break;
+ case COMPLETED:
+ instant = lastCompleteInstant();
+ break;
+ default:
+ throw new AssertionError("Unexpected state");
+ }
+ assertThat(instant, is(instantStr));
+ }
+
+ protected String lastCompleteInstant() {
+ return OptionsResolver.isMorTable(conf)
+ ? TestUtils.getLastDeltaCompleteInstant(basePath)
+ : TestUtils.getLastCompleteInstant(basePath);
+ }
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b0f7b5f..e8e177b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -18,7 +18,6 @@
package org.apache.hudi.utils;
-import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -543,17 +542,15 @@ public class TestData {
// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
- FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
- HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
- HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);
+ HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
// 2. check each partition data
expected.forEach((partition, partitionDataSet) -> {
List<String> readBuffer = new ArrayList<>();
- table.getFileSystemView().getAllFileGroups(partition)
- .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
+ table.getBaseFileOnlyView().getLatestBaseFiles(partition)
+ .forEach(baseFile -> {
String path = baseFile.getPath();
try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
@@ -565,7 +562,7 @@ public class TestData {
} catch (IOException e) {
throw new RuntimeException(e);
}
- }));
+ });
assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer));