You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/02 06:45:14 UTC
[flink-table-store] branch master updated: [FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b980af0 [FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many
b980af0 is described below
commit b980af00017a8da585882009ce629866e865f2ae
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 2 14:45:10 2022 +0800
[FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many
This closes #29
---
.../flink/table/store/file/FileStoreImpl.java | 7 +-
.../table/store/file/manifest/ManifestFile.java | 113 ++++++++++++-----
.../store/file/manifest/ManifestFileMeta.java | 65 +++++-----
.../store/file/mergetree/sst/SstFileWriter.java | 133 ++++++++++-----------
.../store/file/operation/FileStoreCommitImpl.java | 5 +-
.../flink/table/store/file/utils/RollingFile.java | 125 +++++++++++++++++++
.../store/file/manifest/ManifestFileMetaTest.java | 77 ++++++------
.../store/file/manifest/ManifestFileTest.java | 46 +++++--
.../store/file/mergetree/sst/SstFileTest.java | 32 -----
.../table/store/file/stats/StatsTestUtils.java | 58 +++++++++
10 files changed, 436 insertions(+), 225 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 2fa5c48..3249c35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -80,7 +80,12 @@ public class FileStoreImpl implements FileStore {
private ManifestFile.Factory manifestFileFactory() {
return new ManifestFile.Factory(
- partitionType, keyType, valueType, options.manifestFormat(), pathFactory());
+ partitionType,
+ keyType,
+ valueType,
+ options.manifestFormat(),
+ pathFactory(),
+ options.manifestTargetSize().getBytes());
}
@VisibleForTesting
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 77b3ba7..6382817 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -18,21 +18,26 @@
package org.apache.flink.table.store.file.manifest;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RollingFile;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -41,23 +46,33 @@ import java.util.List;
*/
public class ManifestFile {
+ private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class);
+
private final RowType partitionType;
private final ManifestEntrySerializer serializer;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
+ private final long suggestedFileSize;
private ManifestFile(
RowType partitionType,
ManifestEntrySerializer serializer,
BulkFormat<RowData, FileSourceSplit> readerFactory,
BulkWriter.Factory<RowData> writerFactory,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ long suggestedFileSize) {
this.partitionType = partitionType;
this.serializer = serializer;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.pathFactory = pathFactory;
+ this.suggestedFileSize = suggestedFileSize;
+ }
+
+ @VisibleForTesting
+ public long suggestedFileSize() {
+ return suggestedFileSize;
}
public List<ManifestEntry> read(String fileName) {
@@ -70,34 +85,56 @@ public class ManifestFile {
}
/**
- * Write several {@link ManifestEntry}s into a manifest file.
+ * Write several {@link ManifestEntry}s into manifest files.
*
* <p>NOTE: This method is atomic.
*/
- public ManifestFileMeta write(List<ManifestEntry> entries) {
+ public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
Preconditions.checkArgument(
entries.size() > 0, "Manifest entries to write must not be empty.");
- Path path = pathFactory.newManifestFile();
+ ManifestRollingFile rollingFile = new ManifestRollingFile();
+ List<ManifestFileMeta> result = new ArrayList<>();
+ List<Path> filesToCleanUp = new ArrayList<>();
try {
- return write(entries, path);
+ rollingFile.write(entries.iterator(), result, filesToCleanUp);
} catch (Throwable e) {
- FileUtils.deleteOrWarn(path);
- throw new RuntimeException(
- "Exception occurs when writing manifest file " + path + ". Clean up.", e);
+ LOG.warn("Exception occurs when writing manifest files. Cleaning up.", e);
+ for (Path path : filesToCleanUp) {
+ FileUtils.deleteOrWarn(path);
+ }
+ throw new RuntimeException(e);
}
+ return result;
+ }
+
+ public void delete(String fileName) {
+ FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
}
- private ManifestFileMeta write(List<ManifestEntry> entries, Path path) throws IOException {
- FSDataOutputStream out =
- path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
- BulkWriter<RowData> writer = writerFactory.create(out);
- long numAddedFiles = 0;
- long numDeletedFiles = 0;
- FieldStatsCollector statsCollector = new FieldStatsCollector(partitionType);
+ private class ManifestRollingFile extends RollingFile<ManifestEntry, ManifestFileMeta> {
+
+ private long numAddedFiles;
+ private long numDeletedFiles;
+ private FieldStatsCollector statsCollector;
+
+ private ManifestRollingFile() {
+ super(suggestedFileSize);
+ resetMeta();
+ }
- for (ManifestEntry entry : entries) {
- writer.addElement(serializer.toRow(entry));
+ @Override
+ protected Path newPath() {
+ return pathFactory.newManifestFile();
+ }
+
+ @Override
+ protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
+ return writerFactory.create(out);
+ }
+
+ @Override
+ protected RowData toRowData(ManifestEntry entry) {
switch (entry.kind()) {
case ADD:
numAddedFiles++;
@@ -107,20 +144,28 @@ public class ManifestFile {
break;
}
statsCollector.collect(entry.partition());
+
+ return serializer.toRow(entry);
}
- writer.finish();
- out.close();
-
- return new ManifestFileMeta(
- path.getName(),
- path.getFileSystem().getFileStatus(path).getLen(),
- numAddedFiles,
- numDeletedFiles,
- statsCollector.extract());
- }
- public void delete(String fileName) {
- FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
+ @Override
+ protected ManifestFileMeta collectFile(Path path) throws IOException {
+ ManifestFileMeta result =
+ new ManifestFileMeta(
+ path.getName(),
+ path.getFileSystem().getFileStatus(path).getLen(),
+ numAddedFiles,
+ numDeletedFiles,
+ statsCollector.extract());
+ resetMeta();
+ return result;
+ }
+
+ private void resetMeta() {
+ numAddedFiles = 0;
+ numDeletedFiles = 0;
+ statsCollector = new FieldStatsCollector(partitionType);
+ }
}
/**
@@ -135,13 +180,15 @@ public class ManifestFile {
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
+ private final long suggestedFileSize;
public Factory(
RowType partitionType,
RowType keyType,
RowType rowType,
FileFormat fileFormat,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ long suggestedFileSize) {
this.partitionType = partitionType;
this.keyType = keyType;
this.rowType = rowType;
@@ -149,6 +196,7 @@ public class ManifestFile {
this.readerFactory = fileFormat.createReaderFactory(entryType);
this.writerFactory = fileFormat.createWriterFactory(entryType);
this.pathFactory = pathFactory;
+ this.suggestedFileSize = suggestedFileSize;
}
public ManifestFile create() {
@@ -157,7 +205,8 @@ public class ManifestFile {
new ManifestEntrySerializer(partitionType, keyType, rowType),
readerFactory,
writerFactory,
- pathFactory);
+ pathFactory,
+ suggestedFileSize);
}
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index be1d629..4ee1c6d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -27,12 +27,10 @@ import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
/** Metadata of a manifest file. */
public class ManifestFileMeta {
@@ -125,56 +123,40 @@ public class ManifestFileMeta {
}
/**
- * Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link
- * ManifestEntry}s representing first adding and then deleting the same sst file will cancel
- * each other.
+ * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+ * then deleting the same sst file will cancel each other.
*
* <p>NOTE: This method is atomic.
*/
public static List<ManifestFileMeta> merge(
List<ManifestFileMeta> metas,
- List<ManifestEntry> entries,
ManifestFile manifestFile,
long suggestedMetaSize,
int suggestedMinMetaCount) {
List<ManifestFileMeta> result = new ArrayList<>();
// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newMetas = new ArrayList<>();
- List<ManifestFileMeta> candidate = new ArrayList<>();
+ List<ManifestFileMeta> candidates = new ArrayList<>();
long totalSize = 0;
- int metaCount = 0;
try {
// merge existing manifests first
for (ManifestFileMeta manifest : metas) {
totalSize += manifest.fileSize;
- metaCount += 1;
- candidate.add(manifest);
- if (totalSize >= suggestedMetaSize || metaCount >= suggestedMinMetaCount) {
+ candidates.add(manifest);
+ if (totalSize >= suggestedMetaSize) {
// reach suggested file size, perform merging and produce new file
- if (candidate.size() == 1) {
- result.add(candidate.get(0));
- } else {
- mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile)
- .ifPresent(
- merged -> {
- newMetas.add(merged);
- result.add(merged);
- });
- }
-
- candidate.clear();
+ mergeCandidates(candidates, manifestFile, result, newMetas);
+ candidates.clear();
totalSize = 0;
- metaCount = 0;
}
}
- // both size and count conditions not satisfied, create new file from entries
- result.addAll(candidate);
- if (entries.size() > 0) {
- ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
- newMetas.add(newManifestFileMeta);
- result.add(newManifestFileMeta);
+ // merge the last bit of manifests if there are too many
+ if (candidates.size() >= suggestedMinMetaCount) {
+ mergeCandidates(candidates, manifestFile, result, newMetas);
+ } else {
+ result.addAll(candidates);
}
} catch (Throwable e) {
// exception occurs, clean up and rethrow
@@ -187,16 +169,25 @@ public class ManifestFileMeta {
return result;
}
- private static Optional<ManifestFileMeta> mergeIntoOneFile(
- List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) {
+ private static void mergeCandidates(
+ List<ManifestFileMeta> candidates,
+ ManifestFile manifestFile,
+ List<ManifestFileMeta> result,
+ List<ManifestFileMeta> newMetas) {
+ if (candidates.size() == 1) {
+ result.add(candidates.get(0));
+ return;
+ }
+
Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : metas) {
+ for (ManifestFileMeta manifest : candidates) {
mergeEntries(manifestFile.read(manifest.fileName), map);
}
- mergeEntries(entries, map);
- return map.isEmpty()
- ? Optional.empty()
- : Optional.of(manifestFile.write(new ArrayList<>(map.values())));
+ if (!map.isEmpty()) {
+ List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
+ result.addAll(merged);
+ newMetas.addAll(merged);
+ }
}
private static void mergeEntries(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
index 91e7ae3..5c18aa6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree.sst;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RollingFile;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
@@ -91,40 +91,20 @@ public class SstFileWriter {
*/
public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
throws Exception {
+ SstRollingFile rollingFile = new StatsCollectingRollingFile(level);
List<SstFileMeta> result = new ArrayList<>();
-
- RollingFile rollingFile = null;
- Path currentPath = null;
+ List<Path> filesToCleanUp = new ArrayList<>();
try {
- while (iterator.hasNext()) {
- if (rollingFile == null) {
- currentPath = pathFactory.newPath();
- rollingFile = new RollingFile(currentPath, suggestedFileSize);
- }
- rollingFile.write(iterator.next());
- if (rollingFile.exceedsSuggestedFileSize()) {
- result.add(rollingFile.finish(level));
- rollingFile = null;
- }
- }
- // finish last file
- if (rollingFile != null) {
- result.add(rollingFile.finish(level));
- }
- iterator.close();
+ rollingFile.write(iterator, result, filesToCleanUp);
} catch (Throwable e) {
LOG.warn("Exception occurs when writing sst files. Cleaning up.", e);
- // clean up finished files
- for (SstFileMeta meta : result) {
- FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName()));
- }
- // clean up in-progress file
- if (currentPath != null) {
- FileUtils.deleteOrWarn(currentPath);
+ for (Path path : filesToCleanUp) {
+ FileUtils.deleteOrWarn(path);
}
throw e;
+ } finally {
+ iterator.close();
}
-
return result;
}
@@ -132,12 +112,9 @@ public class SstFileWriter {
FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
}
- private class RollingFile {
- private final Path path;
- private final long suggestedFileSize;
+ private abstract class SstRollingFile extends RollingFile<KeyValue, SstFileMeta> {
- private final FSDataOutputStream out;
- private final BulkWriter<RowData> writer;
+ private final int level;
private final KeyValueSerializer serializer;
private final RowDataSerializer keySerializer;
@@ -147,37 +124,30 @@ public class SstFileWriter {
private long minSequenceNumber;
private long maxSequenceNumber;
- private RollingFile(Path path, long suggestedFileSize) throws IOException {
- this.path = path;
- this.suggestedFileSize = suggestedFileSize;
-
- this.out =
- this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE);
- this.writer = writerFactory.create(out);
+ private SstRollingFile(int level) {
+ super(suggestedFileSize);
+ this.level = level;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.keySerializer = new RowDataSerializer(keyType);
+ resetMeta();
+ }
- this.rowCount = 0;
- this.minKey = null;
- this.maxKey = null;
- this.minSequenceNumber = Long.MAX_VALUE;
- this.maxSequenceNumber = Long.MIN_VALUE;
+ @Override
+ protected Path newPath() {
+ return pathFactory.newPath();
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new sst file " + path);
- }
+ @Override
+ protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
+ return writerFactory.create(out);
}
- private void write(KeyValue kv) throws IOException {
+ @Override
+ protected RowData toRowData(KeyValue kv) {
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Writing key-value to sst file "
- + path
- + ", kv: "
- + kv.toString(keyType, valueType));
+ LOG.debug("Writing key-value to sst file, kv: " + kv.toString(keyType, valueType));
}
- writer.addElement(serializer.toRow(kv));
rowCount++;
if (minKey == null) {
minKey = keySerializer.toBinaryRow(kv.key()).copy();
@@ -185,17 +155,46 @@ public class SstFileWriter {
maxKey = kv.key();
minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+
+ return serializer.toRow(kv);
}
- private boolean exceedsSuggestedFileSize() throws IOException {
- // NOTE: this method is inaccurate for formats buffering changes in memory
- return out.getPos() >= suggestedFileSize;
+ @Override
+ protected SstFileMeta collectFile(Path path) throws IOException {
+ SstFileMeta result =
+ new SstFileMeta(
+ path.getName(),
+ FileUtils.getFileSize(path),
+ rowCount,
+ minKey,
+ keySerializer.toBinaryRow(maxKey).copy(),
+ collectStats(path),
+ minSequenceNumber,
+ maxSequenceNumber,
+ level);
+ resetMeta();
+ return result;
}
- private SstFileMeta finish(int level) throws IOException {
- writer.finish();
- out.close();
+ private void resetMeta() {
+ rowCount = 0;
+ minKey = null;
+ maxKey = null;
+ minSequenceNumber = Long.MAX_VALUE;
+ maxSequenceNumber = Long.MIN_VALUE;
+ }
+ protected abstract FieldStats[] collectStats(Path path);
+ }
+
+ private class StatsCollectingRollingFile extends SstRollingFile {
+
+ private StatsCollectingRollingFile(int level) {
+ super(level);
+ }
+
+ @Override
+ protected FieldStats[] collectStats(Path path) {
// TODO
// 1. Read statistics directly from the written orc/parquet files.
// 2. For other file formats use StatsCollector. Make sure fields are not reused
@@ -204,17 +203,7 @@ public class SstFileWriter {
for (int i = 0; i < stats.length; i++) {
stats[i] = new FieldStats(null, null, 0);
}
-
- return new SstFileMeta(
- path.getName(),
- FileUtils.getFileSize(path),
- rowCount,
- minKey,
- keySerializer.toBinaryRow(maxKey).copy(),
- stats,
- minSequenceNumber,
- maxSequenceNumber,
- level);
+ return stats;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index d8f8211..1bfdf1b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -313,10 +313,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
newMetas.addAll(
ManifestFileMeta.merge(
oldMetas,
- changes,
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount));
+ // write new changes into manifest files
+ if (!changes.isEmpty()) {
+ newMetas.addAll(manifestFile.write(changes));
+ }
// prepare snapshot file
manifestListName = manifestList.write(newMetas);
newSnapshot =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
new file mode 100644
index 0000000..aeb0d90
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A utility class to write a list of objects into several files, each with a size limit.
+ *
+ * @param <R> record type
+ * @param <F> file meta type
+ */
+public abstract class RollingFile<R, F> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RollingFile.class);
+
+ private final long suggestedFileSize;
+
+ public RollingFile(long suggestedFileSize) {
+ this.suggestedFileSize = suggestedFileSize;
+ }
+
+ /** Create the path for a new file. */
+ protected abstract Path newPath();
+
+ /** Create a new object writer. Called per file. */
+ protected abstract BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException;
+
+ /**
+ * Called before writing a record into file. Per-record calculation can be performed here.
+ *
+ * @param record record to write
+ * @return serialized record
+ */
+ protected abstract RowData toRowData(R record);
+
+ /** Called before closing the current file. Per-file calculation can be performed here. */
+ protected abstract F collectFile(Path path) throws IOException;
+
+ public void write(Iterator<R> iterator, List<F> result, List<Path> filesToCleanUp)
+ throws IOException {
+ Writer writer = null;
+ Path currentPath = null;
+
+ while (iterator.hasNext()) {
+ if (writer == null) {
+ // create new rolling file
+ currentPath = newPath();
+ filesToCleanUp.add(currentPath);
+ writer = new Writer(currentPath);
+ }
+
+ RowData serialized = toRowData(iterator.next());
+ writer.write(serialized);
+
+ if (writer.exceedsSuggestedFileSize()) {
+ // exceeds suggested size, close current file
+ writer.finish();
+ result.add(collectFile(currentPath));
+ writer = null;
+ }
+ }
+
+ // finish last file
+ if (writer != null) {
+ writer.finish();
+ result.add(collectFile(currentPath));
+ }
+ }
+
+ private class Writer {
+ private final FSDataOutputStream out;
+ private final BulkWriter<RowData> writer;
+
+ private Writer(Path path) throws IOException {
+ this.out = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
+ this.writer = newWriter(out);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create new rolling file " + path.toString());
+ }
+ }
+
+ private void write(RowData record) throws IOException {
+ writer.addElement(record);
+ }
+
+ private boolean exceedsSuggestedFileSize() throws IOException {
+ // NOTE: this method is inaccurate for formats buffering changes in memory
+ return out.getPos() >= suggestedFileSize;
+ }
+
+ private void finish() throws IOException {
+ writer.finish();
+ out.close();
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8bf995f..9296a31 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -35,15 +35,18 @@ import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -70,30 +73,28 @@ public class ManifestFileMetaTest {
manifestFile = createManifestFile(tempDir.toString());
}
- @Test
- public void testMerge() {
+ @ParameterizedTest
+ @ValueSource(ints = {2, 3, 4})
+ public void testMerge(int numLastBits) {
List<ManifestFileMeta> input = new ArrayList<>();
- List<ManifestEntry> entries = new ArrayList<>();
List<ManifestFileMeta> expected = new ArrayList<>();
- createData(input, entries, expected);
+ createData(numLastBits, input, expected);
- List<ManifestFileMeta> actual =
- ManifestFileMeta.merge(input, entries, manifestFile, 500, 3);
+ List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, manifestFile, 500, 3);
assertThat(actual).hasSameSizeAs(expected);
- // these three manifest files are merged from the input
+ // these two manifest files are merged from the input
assertSameContent(expected.get(0), actual.get(0), manifestFile);
assertSameContent(expected.get(1), actual.get(1), manifestFile);
- assertSameContent(expected.get(4), actual.get(4), manifestFile);
- // these four manifest files should be kept without modification
+ // these two manifest files should be kept without modification
assertThat(actual.get(2)).isEqualTo(input.get(5));
assertThat(actual.get(3)).isEqualTo(input.get(6));
- assertThat(actual.get(5)).isEqualTo(input.get(10));
- assertThat(actual.get(6)).isEqualTo(input.get(11));
- // this manifest file should be created from entries
- assertSameContent(expected.get(7), actual.get(7), manifestFile);
+ // check last bits
+ for (int i = 4; i < actual.size(); i++) {
+ assertSameContent(expected.get(i), actual.get(i), manifestFile);
+ }
}
private void assertSameContent(
@@ -112,14 +113,13 @@ public class ManifestFileMetaTest {
public void testCleanUpForException() throws IOException {
FailingAtomicRenameFileSystem.get().reset(1, 10);
List<ManifestFileMeta> input = new ArrayList<>();
- List<ManifestEntry> entries = new ArrayList<>();
- createData(input, entries, null);
+ createData(ThreadLocalRandom.current().nextInt(5), input, null);
ManifestFile failingManifestFile =
createManifestFile(
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
try {
- ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30);
+ ManifestFileMeta.merge(input, failingManifestFile, 500, 3);
} catch (Throwable e) {
assertThat(e)
.hasRootCauseExactlyInstanceOf(
@@ -146,24 +146,20 @@ public class ManifestFileMetaTest {
KEY_TYPE,
ROW_TYPE,
avro,
- new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"))
+ new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"),
+ Long.MAX_VALUE)
.create();
}
private void createData(
- List<ManifestFileMeta> input,
- List<ManifestEntry> entries,
- List<ManifestFileMeta> expected) {
+ int numLastBits, List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
// suggested size 500 and suggested count 3
// file sizes:
// 200, 300, -- multiple files exactly the suggested size
// 100, 200, 300, -- multiple files exceeding the suggested size
// 500, -- single file exactly the suggested size
// 600, -- single file exceeding the suggested size
- // 100, 100, 100, -- multiple files exceeding the suggested count
- // 100, -- the last bit, not enough size or count, won't merge
- // 300, -- the last bit, not enough size or count, won't merge
- // 200, -- file created from entries
+ // 100 * numLastBits -- the last bit
input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), makeEntry(true, "D")));
@@ -189,15 +185,9 @@ public class ManifestFileMetaTest {
makeEntry(false, "K"),
makeEntry(true, "L")));
- input.add(makeManifest(makeEntry(true, "M")));
- input.add(makeManifest(makeEntry(true, "N")));
- input.add(makeManifest(makeEntry(true, "O")));
-
- input.add(makeManifest(makeEntry(true, "P")));
- input.add(makeManifest(makeEntry(false, "Q"), makeEntry(true, "R"), makeEntry(true, "S")));
-
- entries.add(makeEntry(false, "S"));
- entries.add(makeEntry(true, "T"));
+ for (int i = 0; i < numLastBits; i++) {
+ input.add(makeManifest(makeEntry(true, String.valueOf(i))));
+ }
if (expected == null) {
return;
@@ -208,15 +198,22 @@ public class ManifestFileMetaTest {
expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, "F")));
expected.add(input.get(5));
expected.add(input.get(6));
- expected.add(
- makeManifest(makeEntry(true, "M"), makeEntry(true, "N"), makeEntry(true, "O")));
- expected.add(input.get(10));
- expected.add(input.get(11));
- expected.add(makeManifest(makeEntry(false, "S"), makeEntry(true, "T")));
+
+ if (numLastBits < 3) {
+ for (int i = 0; i < numLastBits; i++) {
+ expected.add(input.get(7 + i));
+ }
+ } else {
+ expected.add(
+ makeManifest(
+ IntStream.range(0, numLastBits)
+ .mapToObj(i -> makeEntry(true, String.valueOf(i)))
+ .toArray(ManifestEntry[]::new)));
+ }
}
private ManifestFileMeta makeManifest(ManifestEntry... entries) {
- ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries));
+ ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries)).get(0);
return new ManifestFileMeta(
writtenMeta.fileName(),
entries.length * 100, // for testing purpose
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index aa65636..f1c8c78 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -32,6 +34,8 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -51,10 +55,12 @@ public class ManifestFileTest {
ManifestFileMeta meta = gen.createManifestFileMeta(entries);
ManifestFile manifestFile = createManifestFile(tempDir.toString());
- ManifestFileMeta actualMeta = manifestFile.write(entries);
- // we do not check file name and size as we can't know in advance
- checkMetaIgnoringFileNameAndSize(meta, actualMeta);
- List<ManifestEntry> actualEntries = manifestFile.read(actualMeta.fileName());
+ List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
+ checkRollingFiles(meta, actualMetas, manifestFile.suggestedFileSize());
+ List<ManifestEntry> actualEntries =
+ actualMetas.stream()
+ .flatMap(m -> manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList());
assertThat(actualEntries).isEqualTo(entries);
}
@@ -90,19 +96,39 @@ public class ManifestFileTest {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
+ int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
return new ManifestFile.Factory(
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
avro,
- pathFactory)
+ pathFactory,
+ suggestedFileSize)
.create();
}
- private void checkMetaIgnoringFileNameAndSize(
- ManifestFileMeta expected, ManifestFileMeta actual) {
- assertThat(actual.numAddedFiles()).isEqualTo(expected.numAddedFiles());
- assertThat(actual.numDeletedFiles()).isEqualTo(expected.numDeletedFiles());
- assertThat(actual.partitionStats()).isEqualTo(expected.partitionStats());
+ private void checkRollingFiles(
+ ManifestFileMeta expected, List<ManifestFileMeta> actual, long suggestedFileSize) {
+ // all but last file should be no smaller than suggestedFileSize
+ for (int i = 0; i + 1 < actual.size(); i++) {
+ assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue();
+ }
+
+ // expected.numAddedFiles == sum(numAddedFiles)
+ assertThat(actual.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum())
+ .isEqualTo(expected.numAddedFiles());
+
+ // expected.numDeletedFiles == sum(numDeletedFiles)
+ assertThat(actual.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum())
+ .isEqualTo(expected.numDeletedFiles());
+
+ // check stats
+ for (int i = 0; i < expected.partitionStats().length; i++) {
+ List<FieldStats> actualStats = new ArrayList<>();
+ for (ManifestFileMeta meta : actual) {
+ actualStats.add(meta.partitionStats()[i]);
+ }
+ StatsTestUtils.checkRollingFileStats(expected.partitionStats()[i], actualStats);
+ }
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index e5bde46..00f44e7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializerTest;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
@@ -297,35 +296,4 @@ public class SstFileTest {
assertThat(meta.level()).isEqualTo(expected.level());
}
}
-
- @SuppressWarnings("unchecked")
- private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
- if (expected.minValue() instanceof Comparable) {
- Object actualMin = null;
- Object actualMax = null;
- for (FieldStats stats : actual) {
- if (stats.minValue() != null
- && (actualMin == null
- || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
- < 0)) {
- actualMin = stats.minValue();
- }
- if (stats.maxValue() != null
- && (actualMax == null
- || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
- > 0)) {
- actualMax = stats.maxValue();
- }
- }
- assertThat(actualMin).isEqualTo(expected.minValue());
- assertThat(actualMax).isEqualTo(expected.maxValue());
- } else {
- for (FieldStats stats : actual) {
- assertThat(stats.minValue()).isNull();
- assertThat(stats.maxValue()).isNull();
- }
- }
- assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
- .isEqualTo(expected.nullCount());
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
new file mode 100644
index 0000000..379b600
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utils for stats related tests. */
+public class StatsTestUtils {
+
+ @SuppressWarnings("unchecked")
+ public static void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
+ if (expected.minValue() instanceof Comparable) {
+ Object actualMin = null;
+ Object actualMax = null;
+ for (FieldStats stats : actual) {
+ if (stats.minValue() != null
+ && (actualMin == null
+ || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
+ < 0)) {
+ actualMin = stats.minValue();
+ }
+ if (stats.maxValue() != null
+ && (actualMax == null
+ || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
+ > 0)) {
+ actualMax = stats.maxValue();
+ }
+ }
+ assertThat(actualMin).isEqualTo(expected.minValue());
+ assertThat(actualMax).isEqualTo(expected.maxValue());
+ } else {
+ for (FieldStats stats : actual) {
+ assertThat(stats.minValue()).isNull();
+ assertThat(stats.maxValue()).isNull();
+ }
+ }
+ assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
+ .isEqualTo(expected.nullCount());
+ }
+}