You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/06 10:10:45 UTC

[GitHub] [flink-table-store] openinx opened a new pull request, #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

openinx opened a new pull request, #110:
URL: https://github.com/apache/flink-table-store/pull/110

   This is a sub PR for https://github.com/apache/flink-table-store/pull/99


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867327547


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;

Review Comment:
   I have some confusion about the relationship between flush, close, and result.
   - Is there a need for flush to exist?
   - Is it possible to merge close and result, and return result directly when closing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867334974


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;

Review Comment:
   > Is there a need for flush to exist?
   
   The current `RollingFileWriter` or `BaseFileWriter` should not use any `flush` or `sync` semantic from filesystem.   The further writer to write `LogEntry` ( which is similar to the HBase's `HLog`) will rely on this interface, because once we write one transaction into the table store, we will need to make sure that all data are  persisted in the filesystem.
   
   For now, I'm okay to remove this method in the current version. I guess there will be another version `Writer` API for the further table store service.
   
   



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;
+
+    /** Abort to clear orphan file(s) if encounter any error. */
+    default void abort() {}

Review Comment:
   Sounds good to me,  let's remove the `default` modifier.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;

Review Comment:
   > Is it possible to merge close and result, and return result directly when closing
   
   I will suggest to keep the current separate `close()` and `result()` approach.  Because the `void close() throws IOException` is a standard `java.io.Closeable` ( which is extending `java.io.AutoCloseable` ).  That means we can use `try (writer = ...) {}`  (without any `finally` block) and `IOUtils.closeQuietly` to close the writer automatically. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867479945


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -91,176 +94,149 @@ public DataFilePathFactory pathFactory() {
     }
 
     /**
-     * Write several {@link KeyValue}s into an data file of a given level.
+     * Write several {@link KeyValue}s into a data file of a given level.
      *
      * <p>NOTE: This method is atomic.
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
         } finally {
             iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
+
+        @Override
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            KeyValueSerializer serializer = new KeyValueSerializer(keyType, valueType);
+
+            return new BaseBulkWriter<>(writerFactory.create(out), serializer::toRow);
+        }
+    }
 
+    private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> {
         private final int level;
-        private final KeyValueSerializer serializer;
         private final RowDataSerializer keySerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
+
+        private BinaryRowData minKey = null;
+        private RowData maxKey = null;
+        private long minSeqNumber = Long.MAX_VALUE;
+        private long maxSeqNumber = Long.MIN_VALUE;
+
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
             this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
             this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
-        }
-
-        @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
-        }
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
 
-        @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
+            if (fileStatsExtractor == null) {

Review Comment:
   Lots of `fileStatsExtractor == null` looks bad.
   I think we can have a `StatsProducer` to unify `StatsExtractor` and `StatsCollector`.  To reduce caller complexity. I create https://issues.apache.org/jira/browse/FLINK-27543 for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867438427


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;

Review Comment:
   Sounds reasonable to me~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867288653


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {

Review Comment:
   It is better to rename `RollingFile` to `RollingFileWriter`?
   RollingFileWriter is here to replace RollingFile, and we should delete RollingFile



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriter(Supplier<BaseFileWriter<T>> writerFactory, long targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        // Open the current writer if write the first record or roll over happen before.
+        if (currentWriter == null) {
+            currentWriter = writerFactory.get();
+        }
+
+        currentWriter.write(row);
+        recordCount += 1;
+
+        if (currentWriter.length() >= targetFileSize) {
+            currentWriter.close();
+            results.add(currentWriter.result());
+
+            currentWriter = null;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        long lengthOfClosedFiles = results.stream().mapToLong(DataFileMeta::fileSize).sum();
+        if (currentWriter != null) {
+            lengthOfClosedFiles += currentWriter.length();

Review Comment:
   `lengthOfClosedFiles` -> `totalLength`?
   Because it adds the unclosed writer.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() {
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
-        } finally {
-            iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriter implements BulkWriter<KeyValue> {
 
-        private final int level;
-        private final KeyValueSerializer serializer;
-        private final RowDataSerializer keySerializer;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer keyValueSerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
-            this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-            this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
+        KvBulkWriter(BulkWriter<RowData> writer) {
+            this.writer = writer;
+            this.keyValueSerializer = new KeyValueSerializer(keyType, valueType);
         }
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+        public void addElement(KeyValue keyValue) throws IOException {
+            writer.addElement(keyValueSerializer.toRow(keyValue));
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
+        public void flush() throws IOException {
+            writer.flush();
         }
 
         @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
-            }
-
-            rowCount++;
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
-
-            return serializer.toRow(kv);
+        public void finish() throws IOException {
+            writer.finish();
         }
+    }
+
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
 
         @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            return new KvBulkWriter(writerFactory.create(out));
         }
+    }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
-        }
+    private class KvFileWriter extends BaseFileWriter<KeyValue> {
+        private final int level;
+        private final RowDataSerializer keySerializer;
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
 
-    private class FileExtractingRollingFile extends DataRollingFile {
+        private BinaryRowData minKey = null;
+        private BinaryRowData maxKey = null;
+        private Long minSequenceNumber = null;
+        private Long maxSequenceNumber = null;
 
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.level = level;
+            this.keySerializer = new RowDataSerializer(keyType);
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
+
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
-        }
-    }
+            updateMinKey(kv);
+            updateMaxKey(kv);
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
 
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
+        private void updateMinKey(KeyValue kv) {
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+        }
 
-        private StatsCollectingRollingFile(int level) {
-            super(level);
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = keySerializer.toBinaryRow(kv.key()).copy();

Review Comment:
   We should avoid copy per record.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;

Review Comment:
   BaseFileWriter  -> `FileWriter<T>`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() {
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
-        } finally {
-            iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriter implements BulkWriter<KeyValue> {
 
-        private final int level;
-        private final KeyValueSerializer serializer;
-        private final RowDataSerializer keySerializer;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer keyValueSerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
-            this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-            this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
+        KvBulkWriter(BulkWriter<RowData> writer) {
+            this.writer = writer;
+            this.keyValueSerializer = new KeyValueSerializer(keyType, valueType);
         }
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+        public void addElement(KeyValue keyValue) throws IOException {
+            writer.addElement(keyValueSerializer.toRow(keyValue));
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
+        public void flush() throws IOException {
+            writer.flush();
         }
 
         @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
-            }
-
-            rowCount++;
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
-
-            return serializer.toRow(kv);
+        public void finish() throws IOException {
+            writer.finish();
         }
+    }
+
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
 
         @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            return new KvBulkWriter(writerFactory.create(out));
         }
+    }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
-        }
+    private class KvFileWriter extends BaseFileWriter<KeyValue> {
+        private final int level;
+        private final RowDataSerializer keySerializer;
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
 
-    private class FileExtractingRollingFile extends DataRollingFile {
+        private BinaryRowData minKey = null;
+        private BinaryRowData maxKey = null;
+        private Long minSequenceNumber = null;
+        private Long maxSequenceNumber = null;
 
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.level = level;
+            this.keySerializer = new RowDataSerializer(keyType);
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
+
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
-        }
-    }
+            updateMinKey(kv);
+            updateMaxKey(kv);
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
 
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
+        private void updateMinKey(KeyValue kv) {
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+        }
 
-        private StatsCollectingRollingFile(int level) {
-            super(level);
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = keySerializer.toBinaryRow(kv.key()).copy();
         }
 
-        @Override
-        protected RowData toRowData(KeyValue kv) {
-            keyStatsCollector.collect(kv.key());
-            valueStatsCollector.collect(kv.value());
-            return super.toRowData(kv);
+        private void updateMinSeqNumber(KeyValue kv) {
+            if (minSequenceNumber == null) {
+                minSequenceNumber = kv.sequenceNumber();

Review Comment:
   Just init `minSequenceNumber` with not null?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The abstracted base file writer implementation for {@link FileWriter}.
+ *
+ * @param <T> record data type.
+ */
+public abstract class BaseFileWriter<T> implements FileWriter<T, DataFileMeta> {
+
+    private final Path path;
+
+    private long recordCount;
+    private FSDataOutputStream currentOut;
+    private BulkWriter<T> currentWriter;
+
+    private boolean closed = false;
+
+    public BaseFileWriter(BulkWriter.Factory<T> writerFactory, Path path) throws IOException {
+        this.path = path;
+
+        this.recordCount = 0;
+        this.currentOut = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
+        this.currentWriter = writerFactory.create(currentOut);
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        currentWriter.addElement(row);
+        recordCount += 1;
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        return currentOut.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        currentWriter.flush();
+    }
+
+    protected abstract DataFileMeta createDataFileMeta(Path path) throws IOException;
+
+    @Override
+    public void abort() {

Review Comment:
   Should the invoker close first?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriter(Supplier<BaseFileWriter<T>> writerFactory, long targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        // Open the current writer if write the first record or roll over happen before.
+        if (currentWriter == null) {
+            currentWriter = writerFactory.get();
+        }
+
+        currentWriter.write(row);
+        recordCount += 1;
+
+        if (currentWriter.length() >= targetFileSize) {
+            currentWriter.close();
+            results.add(currentWriter.result());
+
+            currentWriter = null;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        long lengthOfClosedFiles = results.stream().mapToLong(DataFileMeta::fileSize).sum();
+        if (currentWriter != null) {
+            lengthOfClosedFiles += currentWriter.length();
+        }
+
+        return lengthOfClosedFiles;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (currentWriter != null) {
+            currentWriter.flush();
+        }
+    }
+
+    @Override
+    public void abort() {
+        // TODO abort to delete all created files.

Review Comment:
   finish this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #110:
URL: https://github.com/apache/flink-table-store/pull/110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867300460


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {

Review Comment:
   Yes, will remove the `RollingFile` in the next updating PR, and re-implement the Manifest writer by using the new writer API.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The abstracted base file writer implementation for {@link FileWriter}.
+ *
+ * @param <T> record data type.
+ */
+public abstract class BaseFileWriter<T> implements FileWriter<T, DataFileMeta> {
+
+    private final Path path;
+
+    private long recordCount;
+    private FSDataOutputStream currentOut;
+    private BulkWriter<T> currentWriter;
+
+    private boolean closed = false;
+
+    public BaseFileWriter(BulkWriter.Factory<T> writerFactory, Path path) throws IOException {
+        this.path = path;
+
+        this.recordCount = 0;
+        this.currentOut = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
+        this.currentWriter = writerFactory.create(currentOut);
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        currentWriter.addElement(row);
+        recordCount += 1;
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        return currentOut.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        currentWriter.flush();
+    }
+
+    protected abstract DataFileMeta createDataFileMeta(Path path) throws IOException;
+
+    @Override
+    public void abort() {

Review Comment:
   It's true. The abort method should always call the close() inside it.  That's why I designed the idempotent close() implementation...



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ */
+public class RollingFileWriter<T> implements FileWriter<T, List<DataFileMeta>> {
+
+    private final Supplier<BaseFileWriter<T>> writerFactory;
+    private final long targetFileSize;
+    private final List<DataFileMeta> results;
+
+    private BaseFileWriter<T> currentWriter = null;

Review Comment:
   Yes, I like this idea. 



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() {
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
-        } finally {
-            iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, DataFileMeta> {
+    private class KvBulkWriter implements BulkWriter<KeyValue> {
 
-        private final int level;
-        private final KeyValueSerializer serializer;
-        private final RowDataSerializer keySerializer;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer keyValueSerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
-            this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-            this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
+        KvBulkWriter(BulkWriter<RowData> writer) {
+            this.writer = writer;
+            this.keyValueSerializer = new KeyValueSerializer(keyType, valueType);
         }
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+        public void addElement(KeyValue keyValue) throws IOException {
+            writer.addElement(keyValueSerializer.toRow(keyValue));
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
-            return writerFactory.create(out);
+        public void flush() throws IOException {
+            writer.flush();
         }
 
         @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + kv.toString(keyType, valueType));
-            }
-
-            rowCount++;
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
-
-            return serializer.toRow(kv);
+        public void finish() throws IOException {
+            writer.finish();
         }
+    }
+
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
 
         @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws IOException {
+            return new KvBulkWriter(writerFactory.create(out));
         }
+    }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
-        }
+    private class KvFileWriter extends BaseFileWriter<KeyValue> {
+        private final int level;
+        private final RowDataSerializer keySerializer;
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
 
-    private class FileExtractingRollingFile extends DataRollingFile {
+        private BinaryRowData minKey = null;
+        private BinaryRowData maxKey = null;
+        private Long minSequenceNumber = null;
+        private Long maxSequenceNumber = null;
 
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.level = level;
+            this.keySerializer = new RowDataSerializer(keyType);
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
+
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
-        }
-    }
+            updateMinKey(kv);
+            updateMaxKey(kv);
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
 
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
+        private void updateMinKey(KeyValue kv) {
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+        }
 
-        private StatsCollectingRollingFile(int level) {
-            super(level);
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = keySerializer.toBinaryRow(kv.key()).copy();

Review Comment:
   Yes, I just released this issue when I get the first round self-reviewing..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867314010


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Write only one record to this file.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /**
+     * Flush the buffered records into underlying file system.
+     *
+     * @throws IOException if encounter any IO error.
+     */
+    void flush() throws IOException;
+
+    /** Abort to clear orphan file(s) if encounter any error. */
+    default void abort() {}

Review Comment:
   It seems that all implementations implement `abort`. No need to be default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org