You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/07 02:23:20 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867288653


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {

Review Comment:
   It is better to rename `RollingFile` to `RollingFileWriter`?
   RollingFileWriter is here to replace RollingFile, and we should delete RollingFile



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriter(Supplier<BaseFileWriter<T>> writerFactory, long targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        // Open the current writer if write the first record or roll over happen before.
+        if (currentWriter == null) {
+            currentWriter = writerFactory.get();
+        }
+
+        currentWriter.write(row);
+        recordCount += 1;
+
+        if (currentWriter.length() >= targetFileSize) {
+            currentWriter.close();
+            results.add(currentWriter.result());
+
+            currentWriter = null;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        long lengthOfClosedFiles = results.stream().mapToLong(DataFileMeta::fileSize).sum();
+        if (currentWriter != null) {
+            lengthOfClosedFiles += currentWriter.length();

Review Comment:
   `lengthOfClosedFiles` -> `totalLength`?
   Because it adds the unclosed writer.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() {
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
-        } finally {
-            iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriter implements BulkWriter<KeyValue> {
 
-        private final int level;
-        private final KeyValueSerializer serializer;
-        private final RowDataSerializer keySerializer;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer keyValueSerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
-            this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-            this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
+        KvBulkWriter(BulkWriter<RowData> writer) {
+            this.writer = writer;
+            this.keyValueSerializer = new KeyValueSerializer(keyType, valueType);
         }
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+        public void addElement(KeyValue keyValue) throws IOException {
+            writer.addElement(keyValueSerializer.toRow(keyValue));
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
+        public void flush() throws IOException {
+            writer.flush();
         }
 
         @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
-            }
-
-            rowCount++;
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
-
-            return serializer.toRow(kv);
+        public void finish() throws IOException {
+            writer.finish();
         }
+    }
+
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
 
         @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            return new KvBulkWriter(writerFactory.create(out));
         }
+    }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
-        }
+    private class KvFileWriter extends BaseFileWriter<KeyValue> {
+        private final int level;
+        private final RowDataSerializer keySerializer;
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
 
-    private class FileExtractingRollingFile extends DataRollingFile {
+        private BinaryRowData minKey = null;
+        private BinaryRowData maxKey = null;
+        private Long minSequenceNumber = null;
+        private Long maxSequenceNumber = null;
 
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.level = level;
+            this.keySerializer = new RowDataSerializer(keyType);
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
+
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
-        }
-    }
+            updateMinKey(kv);
+            updateMaxKey(kv);
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
 
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
+        private void updateMinKey(KeyValue kv) {
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+        }
 
-        private StatsCollectingRollingFile(int level) {
-            super(level);
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = keySerializer.toBinaryRow(kv.key()).copy();

Review Comment:
   We should avoid copy per record.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;

Review Comment:
   BaseFileWriter  -> `FileWriter<T>`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() {
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
-        } finally {
-            iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriter implements BulkWriter<KeyValue> {
 
-        private final int level;
-        private final KeyValueSerializer serializer;
-        private final RowDataSerializer keySerializer;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer keyValueSerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
-            this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-            this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
+        KvBulkWriter(BulkWriter<RowData> writer) {
+            this.writer = writer;
+            this.keyValueSerializer = new KeyValueSerializer(keyType, valueType);
         }
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+        public void addElement(KeyValue keyValue) throws IOException {
+            writer.addElement(keyValueSerializer.toRow(keyValue));
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
+        public void flush() throws IOException {
+            writer.flush();
         }
 
         @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
-            }
-
-            rowCount++;
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
-
-            return serializer.toRow(kv);
+        public void finish() throws IOException {
+            writer.finish();
         }
+    }
+
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
 
         @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            return new KvBulkWriter(writerFactory.create(out));
         }
+    }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
-        }
+    private class KvFileWriter extends BaseFileWriter<KeyValue> {
+        private final int level;
+        private final RowDataSerializer keySerializer;
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
 
-    private class FileExtractingRollingFile extends DataRollingFile {
+        private BinaryRowData minKey = null;
+        private BinaryRowData maxKey = null;
+        private Long minSequenceNumber = null;
+        private Long maxSequenceNumber = null;
 
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.level = level;
+            this.keySerializer = new RowDataSerializer(keyType);
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
+
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
-        }
-    }
+            updateMinKey(kv);
+            updateMaxKey(kv);
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
 
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
+        private void updateMinKey(KeyValue kv) {
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+        }
 
-        private StatsCollectingRollingFile(int level) {
-            super(level);
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = keySerializer.toBinaryRow(kv.key()).copy();
         }
 
-        @Override
-        protected RowData toRowData(KeyValue kv) {
-            keyStatsCollector.collect(kv.key());
-            valueStatsCollector.collect(kv.value());
-            return super.toRowData(kv);
+        private void updateMinSeqNumber(KeyValue kv) {
+            if (minSequenceNumber == null) {
+                minSequenceNumber = kv.sequenceNumber();

Review Comment:
   Just init `minSequenceNumber` with not null?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The abstracted base file writer implementation for {@link FileWriter}.
+ *
+ * @param <T> record data type.
+ */
+public abstract class BaseFileWriter<T> implements FileWriter<T, DataFileMeta> {
+
+    private final Path path;
+
+    private long recordCount;
+    private FSDataOutputStream currentOut;
+    private BulkWriter<T> currentWriter;
+
+    private boolean closed = false;
+
+    public BaseFileWriter(BulkWriter.Factory<T> writerFactory, Path path) throws IOException {
+        this.path = path;
+
+        this.recordCount = 0;
+        this.currentOut = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
+        this.currentWriter = writerFactory.create(currentOut);
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        currentWriter.addElement(row);
+        recordCount += 1;
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        return currentOut.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        currentWriter.flush();
+    }
+
+    protected abstract DataFileMeta createDataFileMeta(Path path) throws IOException;
+
+    @Override
+    public void abort() {

Review Comment:
   Should the invoker close first?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriter(Supplier<BaseFileWriter<T>> writerFactory, long targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        // Open the current writer if write the first record or roll over happen before.
+        if (currentWriter == null) {
+            currentWriter = writerFactory.get();
+        }
+
+        currentWriter.write(row);
+        recordCount += 1;
+
+        if (currentWriter.length() >= targetFileSize) {
+            currentWriter.close();
+            results.add(currentWriter.result());
+
+            currentWriter = null;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        long lengthOfClosedFiles = results.stream().mapToLong(DataFileMeta::fileSize).sum();
+        if (currentWriter != null) {
+            lengthOfClosedFiles += currentWriter.length();
+        }
+
+        return lengthOfClosedFiles;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (currentWriter != null) {
+            currentWriter.flush();
+        }
+    }
+
+    @Override
+    public void abort() {
+        // TODO abort to delete all created files.

Review Comment:
   finish this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org