You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 09:43:25 UTC

[flink-table-store] branch master updated: [FLINK-29342] Do not query writer.length() per record in RollingFileWriter

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 557594ed [FLINK-29342] Do not query writer.length() per record in RollingFileWriter
557594ed is described below

commit 557594ed42f075383f0a35c1320ba98f00171e88
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Sep 26 17:43:20 2022 +0800

    [FLINK-29342] Do not query writer.length() per record in RollingFileWriter
    
    This closes #303
---
 .../table/store/file/io/RollingFileWriter.java     | 11 ++-
 .../store/file/data/AppendOnlyWriterTest.java      | 18 ++--
 .../table/store/file/io/RollingFileWriterTest.java | 99 ++++++++++++++++++++++
 3 files changed, 119 insertions(+), 9 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
index f635ac00..3c1b8fa5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
@@ -40,6 +40,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class);
 
+    private static final int CHECK_ROLLING_RECORD_CNT = 1000;
+
     private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
     private final long targetFileSize;
     private final List<SingleFileWriter<T, R>> openedWriters;
@@ -63,6 +65,13 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
         return targetFileSize;
     }
 
+    @VisibleForTesting
+    boolean rollingFile() throws IOException {
+        // query writer's length per 1000 records
+        return recordCount % CHECK_ROLLING_RECORD_CNT == 0
+                && currentWriter.length() >= targetFileSize;
+    }
+
     @Override
     public void write(T row) throws IOException {
         try {
@@ -74,7 +83,7 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
             currentWriter.write(row);
             recordCount += 1;
 
-            if (currentWriter.length() >= targetFileSize) {
+            if (rollingFile()) {
                 closeCurrentWriter();
             }
         } catch (Throwable e) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 037faeeb..9d67c7d8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -184,11 +184,11 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testRollingWrite() throws Exception {
-        // Set a very small target file size, so that we will roll over to a new file even if
-        // writing one record.
+        // Set a very small target file size, so the threshold to trigger rolling becomes record
+        // count instead of file size, because we check rolling per 1000 records.
         AppendOnlyWriter writer = createEmptyWriter(10L);
 
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 10 * 1000; i++) {
             writer.write(row(i, String.format("%03d", i), PART));
         }
 
@@ -204,21 +204,23 @@ public class AppendOnlyWriterTest {
             Path path = pathFactory.toPath(meta.fileName());
             assertThat(path.getFileSystem().exists(path)).isTrue();
 
-            assertThat(meta.rowCount()).isEqualTo(1L);
+            assertThat(meta.rowCount()).isEqualTo(1000L);
             assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
             assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW);
             assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
 
+            int min = id * 1000;
+            int max = id * 1000 + 999;
             FieldStats[] expected =
                     new FieldStats[] {
-                        initStats(id, id, 0),
-                        initStats(String.format("%03d", id), String.format("%03d", id), 0),
+                        initStats(min, max, 0),
+                        initStats(String.format("%03d", min), String.format("%03d", max), 0),
                         initStats(PART, PART, 0)
                     };
             assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected));
 
-            assertThat(meta.minSequenceNumber()).isEqualTo(id);
-            assertThat(meta.maxSequenceNumber()).isEqualTo(id);
+            assertThat(meta.minSequenceNumber()).isEqualTo(min);
+            assertThat(meta.maxSequenceNumber()).isEqualTo(max);
             assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
 
             id += 1;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java
new file mode 100644
index 00000000..9086187e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/RollingFileWriterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RollingFileWriter}. */
+public class RollingFileWriterTest {
+
+    private static final RowType SCHEMA =
+            RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"});
+
+    /**
+     * Set a very small target file size, so that we will roll over to a new file even if writing
+     * one record.
+     */
+    private static final Long TARGET_FILE_SIZE = 64L;
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private RollingFileWriter<RowData, DataFileMeta> rollingFileWriter;
+
+    @BeforeEach
+    public void beforeEach() {
+        FileFormat fileFormat = FileFormat.fromIdentifier("avro", new Configuration());
+        rollingFileWriter =
+                new RollingFileWriter<>(
+                        () ->
+                                new RowDataFileWriter(
+                                        fileFormat.createWriterFactory(SCHEMA),
+                                        new DataFilePathFactory(
+                                                        new Path(tempDir.toString()),
+                                                        "",
+                                                        0,
+                                                        CoreOptions.FILE_FORMAT.defaultValue())
+                                                .newPath(),
+                                        SCHEMA,
+                                        fileFormat.createStatsExtractor(SCHEMA).orElse(null),
+                                        0L,
+                                        new LongCounter(0)),
+                        TARGET_FILE_SIZE);
+    }
+
+    @Test
+    public void testRolling() throws IOException {
+        for (int i = 0; i < 3000; i++) {
+            rollingFileWriter.write(GenericRowData.of(i));
+            if (i < 1000) {
+                assertFileNum(1);
+            } else if (i < 2000) {
+                assertFileNum(2);
+            } else {
+                assertFileNum(3);
+            }
+        }
+    }
+
+    private void assertFileNum(int expected) {
+        File dataDir = tempDir.resolve("bucket-0").toFile();
+        File[] files = dataDir.listFiles();
+        assertThat(files).isNotNull().hasSize(expected);
+    }
+}