You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/18 06:27:35 UTC

[flink] 01/02: [FLINK-26698][runtime] Uses the actual basePath instance instead of only the path of Path instance

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 265788bf7a6f12f5c6b417326cb60437a5f313d6
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Mar 17 09:37:31 2022 +0100

    [FLINK-26698][runtime] Uses the actual basePath instance instead of only the path of Path instance
    
    The issue before the fix was, that using getPath would strip
    off the scheme information which causes problems in situations
    where the FileSystem is not the default FileSystem
---
 .../highavailability/FileSystemJobResultStore.java |  9 ++++++--
 .../FileSystemJobResultStoreTestInternal.java      | 25 ++++++++++++++++++++--
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index a7ed3c5..5c19c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -114,7 +114,7 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
      * @return A path for a dirty entry for the given the Job ID.
      */
     private Path constructDirtyPath(JobID jobId) {
-        return new Path(this.basePath.getPath(), jobId.toString() + DIRTY_FILE_EXTENSION);
+        return constructEntryPath(jobId.toString() + DIRTY_FILE_EXTENSION);
     }
 
     /**
@@ -125,7 +125,12 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
      * @return A path for a clean entry for the given the Job ID.
      */
     private Path constructCleanPath(JobID jobId) {
-        return new Path(this.basePath.getPath(), jobId.toString() + ".json");
+        return constructEntryPath(jobId.toString() + ".json");
+    }
+
+    @VisibleForTesting
+    Path constructEntryPath(String fileName) {
+        return new Path(this.basePath, fileName);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
index 3a2de32..20d6f8a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
@@ -51,10 +51,31 @@ public class FileSystemJobResultStoreTestInternal {
 
     @TempDir File temporaryFolder;
 
+    private Path basePath;
+
     @BeforeEach
     public void setupTest() throws IOException {
-        Path path = new Path(temporaryFolder.toURI());
-        fileSystemJobResultStore = new FileSystemJobResultStore(path.getFileSystem(), path, false);
+        basePath = new Path(temporaryFolder.toURI());
+        fileSystemJobResultStore =
+                new FileSystemJobResultStore(basePath.getFileSystem(), basePath, false);
+    }
+
+    @Test
+    public void testValidEntryPathCreation() {
+        final Path entryParent =
+                fileSystemJobResultStore.constructEntryPath("random-name").getParent();
+        assertThat(entryParent)
+                .extracting(FileSystemJobResultStoreTestInternal::stripSucceedingSlash)
+                .isEqualTo(stripSucceedingSlash(basePath));
+    }
+
+    private static String stripSucceedingSlash(Path path) {
+        final String uriStr = path.toUri().toString();
+        if (uriStr.charAt(uriStr.length() - 1) == '/') {
+            return uriStr.substring(0, uriStr.length() - 1);
+        }
+
+        return uriStr;
     }
 
     @Test