You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 09:43:25 UTC
[flink-table-store] branch master updated: [FLINK-29342] Do not query writer.length() per record in RollingFileWriter
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 557594ed [FLINK-29342] Do not query writer.length() per record in RollingFileWriter
557594ed is described below
commit 557594ed42f075383f0a35c1320ba98f00171e88
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Sep 26 17:43:20 2022 +0800
[FLINK-29342] Do not query writer.length() per record in RollingFileWriter
This closes #303
---
.../table/store/file/io/RollingFileWriter.java | 11 ++-
.../store/file/data/AppendOnlyWriterTest.java | 18 ++--
.../table/store/file/io/RollingFileWriterTest.java | 99 ++++++++++++++++++++++
3 files changed, 119 insertions(+), 9 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
index f635ac00..3c1b8fa5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
@@ -40,6 +40,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class);
+ private static final int CHECK_ROLLING_RECORD_CNT = 1000;
+
private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
private final long targetFileSize;
private final List<SingleFileWriter<T, R>> openedWriters;
@@ -63,6 +65,13 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
return targetFileSize;
}
+ @VisibleForTesting
+ boolean rollingFile() throws IOException {
+ // query writer's length per 1000 records
+ return recordCount % CHECK_ROLLING_RECORD_CNT == 0
+ && currentWriter.length() >= targetFileSize;
+ }
+
@Override
public void write(T row) throws IOException {
try {
@@ -74,7 +83,7 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
currentWriter.write(row);
recordCount += 1;
- if (currentWriter.length() >= targetFileSize) {
+ if (rollingFile()) {
closeCurrentWriter();
}
} catch (Throwable e) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 037faeeb..9d67c7d8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -184,11 +184,11 @@ public class AppendOnlyWriterTest {
@Test
public void testRollingWrite() throws Exception {
- // Set a very small target file size, so that we will roll over to a new file even if
- // writing one record.
+ // Set a very small target file size, so the threshold to trigger rolling becomes record
+ // count instead of file size, because we check rolling per 1000 records.
AppendOnlyWriter writer = createEmptyWriter(10L);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10 * 1000; i++) {
writer.write(row(i, String.format("%03d", i), PART));
}
@@ -204,21 +204,23 @@ public class AppendOnlyWriterTest {
Path path = pathFactory.toPath(meta.fileName());
assertThat(path.getFileSystem().exists(path)).isTrue();
- assertThat(meta.rowCount()).isEqualTo(1L);
+ assertThat(meta.rowCount()).isEqualTo(1000L);
assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW);
assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
+ int min = id * 1000;
+ int max = id * 1000 + 999;
FieldStats[] expected =
new FieldStats[] {
- initStats(id, id, 0),
- initStats(String.format("%03d", id), String.format("%03d", id), 0),
+ initStats(min, max, 0),
+ initStats(String.format("%03d", min), String.format("%03d", max), 0),
initStats(PART, PART, 0)
};
assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected));
- assertThat(meta.minSequenceNumber()).isEqualTo(id);
- assertThat(meta.maxSequenceNumber()).isEqualTo(id);
+ assertThat(meta.minSequenceNumber()).isEqualTo(min);
+ assertThat(meta.maxSequenceNumber()).isEqualTo(max);
assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
id += 1;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java
new file mode 100644
index 00000000..9086187e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RollingFileWriter}. */
+public class RollingFileWriterTest {
+
+ private static final RowType SCHEMA =
+ RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"});
+
+ /**
+ * Set a very small target file size, so that we will roll over to a new file even if writing
+ * one record.
+ */
+ private static final Long TARGET_FILE_SIZE = 64L;
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private RollingFileWriter<RowData, DataFileMeta> rollingFileWriter;
+
+ @BeforeEach
+ public void beforeEach() {
+ FileFormat fileFormat = FileFormat.fromIdentifier("avro", new Configuration());
+ rollingFileWriter =
+ new RollingFileWriter<>(
+ () ->
+ new RowDataFileWriter(
+ fileFormat.createWriterFactory(SCHEMA),
+ new DataFilePathFactory(
+ new Path(tempDir.toString()),
+ "",
+ 0,
+ CoreOptions.FILE_FORMAT.defaultValue())
+ .newPath(),
+ SCHEMA,
+ fileFormat.createStatsExtractor(SCHEMA).orElse(null),
+ 0L,
+ new LongCounter(0)),
+ TARGET_FILE_SIZE);
+ }
+
+ @Test
+ public void testRolling() throws IOException {
+ for (int i = 0; i < 3000; i++) {
+ rollingFileWriter.write(GenericRowData.of(i));
+ if (i < 1000) {
+ assertFileNum(1);
+ } else if (i < 2000) {
+ assertFileNum(2);
+ } else {
+ assertFileNum(3);
+ }
+ }
+ }
+
+ private void assertFileNum(int expected) {
+ File dataDir = tempDir.resolve("bucket-0").toFile();
+ File[] files = dataDir.listFiles();
+ assertThat(files).isNotNull().hasSize(expected);
+ }
+}