You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/29 05:51:26 UTC

[flink-table-store] branch master updated: [FLINK-28715] Throw better exception when file not found in reading

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 487b05e6 [FLINK-28715] Throw better exception when file not found in reading
487b05e6 is described below

commit 487b05e6d9162d65c1d83d42286d735061864dc1
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 29 13:51:21 2022 +0800

    [FLINK-28715] Throw better exception when file not found in reading
    
    This closes #250
---
 .../table/store/file/data/AppendOnlyReader.java    |  4 +---
 .../table/store/file/data/DataFileReader.java      |  4 +---
 .../flink/table/store/file/utils/FileUtils.java    | 28 +++++++++++++++++-----
 .../flink/table/store/file/data/DataFileTest.java  |  9 +++++++
 4 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
index 9a7429df..536918f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
@@ -37,9 +37,7 @@ public class AppendOnlyReader implements RecordReader<RowData> {
 
     public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory)
             throws IOException {
-        long fileSize = FileUtils.getFileSize(path);
-        FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
-        this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+        this.reader = FileUtils.createFormatReader(readerFactory, path);
     }
 
     @Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 26cd5c65..5da2a2b0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -83,9 +83,7 @@ public class DataFileReader {
         private final KeyValueSerializer serializer;
 
         private DataFileRecordReader(Path path) throws IOException {
-            long fileSize = FileUtils.getFileSize(path);
-            FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
-            this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+            this.reader = FileUtils.createFormatReader(readerFactory, path);
             this.serializer = new KeyValueSerializer(keyType, valueType);
         }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index d625e866..f9af89d2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -54,7 +54,7 @@ public class FileUtils {
     private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
     private static final int LIST_MAX_RETRY = 30;
 
-    public static final Configuration DEFAULT_READER_CONFIG = new Configuration();
+    private static final Configuration DEFAULT_READER_CONFIG = new Configuration();
 
     static {
         DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
@@ -82,11 +82,9 @@ public class FileUtils {
             BulkFormat<RowData, FileSourceSplit> readerFactory)
             throws IOException {
         List<T> result = new ArrayList<>();
-        long fileSize = FileUtils.getFileSize(path);
-        FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize);
-        BulkFormat.Reader<RowData> reader =
-                readerFactory.createReader(DEFAULT_READER_CONFIG, split);
-        Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row)));
+        Utils.forEachRemaining(
+                createFormatReader(readerFactory, path),
+                row -> result.add(serializer.fromRow(row)));
         return result;
     }
 
@@ -182,4 +180,22 @@ public class FileUtils {
                 .filter(name -> name.startsWith(prefix))
                 .map(name -> Long.parseLong(name.substring(prefix.length())));
     }
+
+    public static BulkFormat.Reader<RowData> createFormatReader(
+            BulkFormat<RowData, FileSourceSplit> format, Path file) throws IOException {
+        if (!file.getFileSystem().exists(file)) {
+            throw new FileNotFoundException(
+                    String.format(
+                            "File '%s' not found, Possible causes: "
+                                    + "1.snapshot expires too fast, you can configure 'snapshot.time-retained'"
+                                    + " option with a larger value. "
+                                    + "2.consumption is too slow, you can improve the performance of consumption"
+                                    + " (For example, increasing parallelism).",
+                            file));
+        }
+
+        long fileSize = FileUtils.getFileSize(file);
+        FileSourceSplit split = new FileSourceSplit("ignore", file, 0, fileSize);
+        return format.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 6c851525..58c1c424 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DataFileReader} and {@link DataFileWriter}. */
 public class DataFileTest {
@@ -63,6 +64,14 @@ public class DataFileTest {
 
     @TempDir java.nio.file.Path tempDir;
 
+    @Test
+    public void testReadNonExistentFile() {
+        DataFileReader reader = createDataFileReader(tempDir.toString(), "avro", null, null);
+        assertThatThrownBy(() -> reader.read("dummy_file"))
+                .hasMessageContaining(
+                        "you can configure 'snapshot.time-retained' option with a larger value.");
+    }
+
     @RepeatedTest(10)
     public void testWriteAndReadDataFileWithStatsCollectingRollingFile() throws Exception {
         testWriteAndReadDataFileImpl("avro");