You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/29 05:51:26 UTC
[flink-table-store] branch master updated: [FLINK-28715] Throw better exception when file not found in reading
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 487b05e6 [FLINK-28715] Throw better exception when file not found in reading
487b05e6 is described below
commit 487b05e6d9162d65c1d83d42286d735061864dc1
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 29 13:51:21 2022 +0800
[FLINK-28715] Throw better exception when file not found in reading
This closes #250
---
.../table/store/file/data/AppendOnlyReader.java | 4 +---
.../table/store/file/data/DataFileReader.java | 4 +---
.../flink/table/store/file/utils/FileUtils.java | 28 +++++++++++++++++-----
.../flink/table/store/file/data/DataFileTest.java | 9 +++++++
4 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
index 9a7429df..536918f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
@@ -37,9 +37,7 @@ public class AppendOnlyReader implements RecordReader<RowData> {
public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory)
throws IOException {
- long fileSize = FileUtils.getFileSize(path);
- FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
- this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+ this.reader = FileUtils.createFormatReader(readerFactory, path);
}
@Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 26cd5c65..5da2a2b0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -83,9 +83,7 @@ public class DataFileReader {
private final KeyValueSerializer serializer;
private DataFileRecordReader(Path path) throws IOException {
- long fileSize = FileUtils.getFileSize(path);
- FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
- this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+ this.reader = FileUtils.createFormatReader(readerFactory, path);
this.serializer = new KeyValueSerializer(keyType, valueType);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index d625e866..f9af89d2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -54,7 +54,7 @@ public class FileUtils {
private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
private static final int LIST_MAX_RETRY = 30;
- public static final Configuration DEFAULT_READER_CONFIG = new Configuration();
+ private static final Configuration DEFAULT_READER_CONFIG = new Configuration();
static {
DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
@@ -82,11 +82,9 @@ public class FileUtils {
BulkFormat<RowData, FileSourceSplit> readerFactory)
throws IOException {
List<T> result = new ArrayList<>();
- long fileSize = FileUtils.getFileSize(path);
- FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
- BulkFormat.Reader<RowData> reader =
- readerFactory.createReader(DEFAULT_READER_CONFIG, split);
- Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row)));
+ Utils.forEachRemaining(
+ createFormatReader(readerFactory, path),
+ row -> result.add(serializer.fromRow(row)));
return result;
}
@@ -182,4 +180,22 @@ public class FileUtils {
.filter(name -> name.startsWith(prefix))
.map(name -> Long.parseLong(name.substring(prefix.length())));
}
+
+ public static BulkFormat.Reader<RowData> createFormatReader(
+ BulkFormat<RowData, FileSourceSplit> format, Path file) throws IOException {
+ if (!file.getFileSystem().exists(file)) {
+ throw new FileNotFoundException(
+ String.format(
+ "File '%s' not found, Possible causes: "
+ + "1.snapshot expires too fast, you can configure 'snapshot.time-retained'"
+ + " option with a larger value. "
+ + "2.consumption is too slow, you can improve the performance of consumption"
+ + " (For example, increasing parallelism).",
+ file));
+ }
+
+ long fileSize = FileUtils.getFileSize(file);
+ FileSourceSplit split = new FileSourceSplit("ignore", file, 0, fileSize);
+ return format.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 6c851525..58c1c424 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link DataFileReader} and {@link DataFileWriter}. */
public class DataFileTest {
@@ -63,6 +64,14 @@ public class DataFileTest {
@TempDir java.nio.file.Path tempDir;
+ @Test
+ public void testReadNonExistentFile() {
+ DataFileReader reader = createDataFileReader(tempDir.toString(), "avro", null, null);
+ assertThatThrownBy(() -> reader.read("dummy_file"))
+ .hasMessageContaining(
+ "you can configure 'snapshot.time-retained' option with a larger value.");
+ }
+
@RepeatedTest(10)
public void testWriteAndReadDataFileWithStatsCollectingRollingFile() throws Exception {
testWriteAndReadDataFileImpl("avro");