You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/27 19:13:08 UTC

[GitHub] [flink] XComp commented on a change in pull request #18543: [FLINK-25431] Implement a file-based JobResultStore

XComp commented on a change in pull request #18543:
URL: https://github.com/apache/flink/pull/18543#discussion_r793813631



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+/** The set of configuration options relating to the Job Result Store. */
+public class JobResultStoreOptions {
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<String> STORAGE_PATH =
+            ConfigOptions.key("job-result-store.storage-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines where job results should be stored. This should be an "
+                                                    + "underlying file-system that provides read-after-write consistency. By "
+                                                    + "default, this is %s.",
+                                            TextElement.code(
+                                                    FileSystemJobResultStore
+                                                            .createDefaultJobResultStorePath(
+                                                                    HighAvailabilityOptions
+                                                                            .HA_STORAGE_PATH
+                                                                            .key(),
+                                                                    HighAvailabilityOptions
+                                                                            .HA_CLUSTER_ID
+                                                                            .key())))
+                                    .build());
+
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Boolean> DELETE_ON_COMMIT =
+            ConfigOptions.key("job-result-store.delete-on-commit")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription(
+                            "Determines whether job results should be automatically removed "
+                                    + "from the underlying job result store when they are clean. If "

Review comment:
       ```suggestion
                                       + "from the underlying job result store when the corresponding entry was transitioned into clean state. If "
   ```
   nit: that one I just added after noticing the other typo below. ...otherwise, I would have left it like that. 😇 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+/** The set of configuration options relating to the Job Result Store. */
+public class JobResultStoreOptions {
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<String> STORAGE_PATH =
+            ConfigOptions.key("job-result-store.storage-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines where job results should be stored. This should be an "
+                                                    + "underlying file-system that provides read-after-write consistency. By "
+                                                    + "default, this is %s.",
+                                            TextElement.code(
+                                                    FileSystemJobResultStore
+                                                            .createDefaultJobResultStorePath(
+                                                                    HighAvailabilityOptions
+                                                                            .HA_STORAGE_PATH
+                                                                            .key(),
+                                                                    HighAvailabilityOptions
+                                                                            .HA_CLUSTER_ID
+                                                                            .key())))
+                                    .build());
+
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Boolean> DELETE_ON_COMMIT =
+            ConfigOptions.key("job-result-store.delete-on-commit")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription(
+                            "Determines whether job results should be automatically removed "
+                                    + "from the underlying job result store when they are clean. If "
+                                    + "false, the cleaned job results are, instead, marked as clean "
+                                    + "to indicate their state. In this case, Flink no longer has"
+                                    + "ownership and the resources need to be cleaned up by the"

Review comment:
       ```suggestion
                                       + "ownership and the resources need to be cleaned up by the "
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+/** The set of configuration options relating to the Job Result Store. */
+public class JobResultStoreOptions {
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<String> STORAGE_PATH =
+            ConfigOptions.key("job-result-store.storage-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines where job results should be stored. This should be an "
+                                                    + "underlying file-system that provides read-after-write consistency. By "
+                                                    + "default, this is %s.",
+                                            TextElement.code(
+                                                    FileSystemJobResultStore
+                                                            .createDefaultJobResultStorePath(
+                                                                    HighAvailabilityOptions
+                                                                            .HA_STORAGE_PATH
+                                                                            .key(),
+                                                                    HighAvailabilityOptions
+                                                                            .HA_CLUSTER_ID
+                                                                            .key())))
+                                    .build());
+
+    @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Boolean> DELETE_ON_COMMIT =
+            ConfigOptions.key("job-result-store.delete-on-commit")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription(
+                            "Determines whether job results should be automatically removed "
+                                    + "from the underlying job result store when they are clean. If "
+                                    + "false, the cleaned job results are, instead, marked as clean "
+                                    + "to indicate their state. In this case, Flink no longer has"

Review comment:
       ```suggestion
                                       + "to indicate their state. In this case, Flink no longer has "
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStoreTestInternal.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.JobResultEntry;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.entry;
+
+/** Tests for the internal {@link EmbeddedJobResultStore} mechanisms. */
+@ExtendWith(TestLoggerExtension.class)
+public class EmbeddedJobResultStoreTestInternal {
+
+    private static final JobResultEntry DUMMY_JOB_RESULT_ENTRY =

Review comment:
       What about making the `JobResultStoreContractTest.DUMM_JOB_RESULT_ENTRY` public an using it here as well?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStoreTestInternal.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.JobResultEntry;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.entry;
+
+/** Tests for the internal {@link EmbeddedJobResultStore} mechanisms. */
+@ExtendWith(TestLoggerExtension.class)
+public class EmbeddedJobResultStoreTestInternal {

Review comment:
       It feels like this test class is not really necessary anymore. We're covering all the stuff in `JobResultStoreContractTest` already. Testing the internal representation is not necessary because it's not user-facing (in contrast to the `FileSystemJobResultStore` where the user is interacting with the folder in some usecases.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/**
+ * This interface defines a series of tests for any implementation of the {@link JobResultStore} to
+ * determine whether they correctly implement the contracts defined by the interface.
+ */
+public interface JobResultStoreContractTest {
+
+    JobResultEntry DUMMY_JOB_RESULT_ENTRY =
+            new JobResultEntry(TestingJobResultStore.DUMMY_JOB_RESULT);
+
+    JobResultStore createJobResultStore() throws IOException;
+
+    @Test
+    default void testStoreDirtyJobResult() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }
+
+    @Test
+    default void testStoreDuplicateJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    default void testStoreCleanedJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    default void testMarkDirtyResultAsClean() throws IOException {

Review comment:
       Is obsolete: It's already covered by `testHasJobResultEntryWithCleanEntry`

##########
File path: docs/layouts/shortcodes/generated/common_high_availability_jrs_section.html
##########
@@ -0,0 +1,24 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>job-result-store.delete-on-commit</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Determines whether job results should be automatically removed from the underlying job result store when they are clean. If false, the cleaned job results are, instead, marked as clean to indicate their state. In this case, Flink no longer hasownership and the resources need to be cleaned up by theuser.</td>
+        </tr>
+        <tr>
+            <td><h5>job-result-store.storage-path</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Defines where job results should be stored. This should be an underlying file-system that provides read-after-write consistency. By default, this is <code class="highlighter-rouge">high-availability.storageDir/job-result-store/high-availability.cluster-id</code>.</td>

Review comment:
       ```suggestion
               <td>Defines where job results should be stored. This should be an underlying file-system that provides read-after-write consistency. By default, this is <code class="highlighter-rouge">high-availability.storageDir/job-result-store/high-availability.cluster-id</code>.</td>
   ```
   
   The issue is now that it's difficult to differentiate plain text from variables in the default path. That's why I proposed `FileSystemJobResultStore.createDefaultJobResultStorePath(String.format("{%s}", HighAvailabilityOptions.HA_STORAGE_PATH.key()), String.format("{%s}", HighAvailabilityOptions.HA_CLUSTER_ID.key()))))`.
   
   I'm wondering whether we should add a generic `ConfigOption.keyInPathRepresentation` method to `ConfigOption` to make this available as a consistent format within also for other descriptions. But it feels like it's not the right place to do this because `ConfigOption` is actually not meant for formatting information. 🤔 
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** An abstract class for threadsafe implementations of the {@link JobResultStore}. */
+public abstract class AbstractThreadsafeJobResultStore implements JobResultStore {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractThreadsafeJobResultStore.class);
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    @Override
+    public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException {
+        Preconditions.checkState(
+                !hasJobResultEntry(jobResultEntry.getJobId()),
+                "Job result store already contains an entry for for job %s",
+                jobResultEntry.getJobId());
+
+        readWriteLock.writeLock().lock();
+        try {
+            createDirtyResultInternal(jobResultEntry);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract void createDirtyResultInternal(JobResultEntry jobResultEntry)
+            throws IOException;
+
+    @Override
+    public void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException {
+        if (hasCleanJobResultEntry(jobId)) {
+            LOG.debug("The job {} is already marked as clean. No action required.", jobId);
+            return;
+        }
+
+        readWriteLock.writeLock().lock();
+        try {
+            markResultAsCleanInternal(jobId);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract void markResultAsCleanInternal(JobID jobId)
+            throws IOException, NoSuchElementException;
+
+    @Override
+    public boolean hasJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasDirtyJobResultEntryInternal(jobId) || hasCleanJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasDirtyJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException;
+
+    @Override
+    public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasCleanJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException;
+
+    @Override
+    public Set<JobResult> getDirtyResults() throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return getDirtyResultsInternal();
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract Set<JobResult> getDirtyResultsInternal() throws IOException;

Review comment:
       ```suggestion
       protected abstract Set<JobResult> getDirtyResultsInternal() throws IOException;
       
       
   
       private void withWriteLock(ThrowingRunnable<IOException> runnable) throws IOException {
           readWriteLock.writeLock().lock();
           try {
               runnable.run();
           } finally {
               readWriteLock.writeLock().unlock();
           }
       }
   
       private <T> T withReadLock(SupplierWithException<T, IOException> runnable) throws IOException {
           readWriteLock.readLock().lock();
           try {
               return runnable.get();
           } finally {
               readWriteLock.readLock().unlock();
           }
       }
   ```
   Sorry for not doing it earlier but I looked once more into what you meant by the lock helper methods not being suitable due to the `IOException`. You're right that we cannot use the implementation from the `EmbeddedJobResultStore` right-away. But we could us `SupplierWithException` and `ThrowingRunnable<IOException>`. This would forward the IOException through the method signature and enables us to have dedicated methods for utilizing the read and the write lock.
   
   That's another hint: All the default callback interfaces (e.g. `Runnable`, `Function`, `Supplier`, `Consumer`, ...) have corresponding implementations considering exceptions (e.g. `ThrowingRunnable`, `SupplierWithException`).

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/**
+ * This interface defines a series of tests for any implementation of the {@link JobResultStore} to
+ * determine whether they correctly implement the contracts defined by the interface.
+ */
+public interface JobResultStoreContractTest {
+
+    JobResultEntry DUMMY_JOB_RESULT_ENTRY =
+            new JobResultEntry(TestingJobResultStore.DUMMY_JOB_RESULT);
+
+    JobResultStore createJobResultStore() throws IOException;
+
+    @Test
+    default void testStoreDirtyJobResult() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }
+
+    @Test
+    default void testStoreDuplicateJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    default void testStoreCleanedJobResultThrowsException() throws IOException {

Review comment:
       ```suggestion
       default void testAddDirtyEntryForAlreadyCleanedJobResultThrowsException() throws IOException {
   ```
   The other name was misleading

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/**
+ * This interface defines a series of tests for any implementation of the {@link JobResultStore} to
+ * determine whether they correctly implement the contracts defined by the interface.
+ */
+public interface JobResultStoreContractTest {
+
+    JobResultEntry DUMMY_JOB_RESULT_ENTRY =
+            new JobResultEntry(TestingJobResultStore.DUMMY_JOB_RESULT);
+
+    JobResultStore createJobResultStore() throws IOException;
+
+    @Test
+    default void testStoreDirtyJobResult() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }

Review comment:
       That one can be removed. It's covered by `testHasJobResultEntryWithDirtyEntry`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/**
+ * This interface defines a series of tests for any implementation of the {@link JobResultStore} to
+ * determine whether they correctly implement the contracts defined by the interface.
+ */
+public interface JobResultStoreContractTest {
+
+    JobResultEntry DUMMY_JOB_RESULT_ENTRY =
+            new JobResultEntry(TestingJobResultStore.DUMMY_JOB_RESULT);
+
+    JobResultStore createJobResultStore() throws IOException;
+
+    @Test
+    default void testStoreDirtyJobResult() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }
+
+    @Test
+    default void testStoreDuplicateJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);

Review comment:
       ```suggestion
           final JobResultEntry otherEntryWithSameJobId = TestingJobResultStore.createJobResult(DUMMY_JOB_RESULT_ENTRY.getJobId(), ApplicationStatus.FAILED);
           assertThatThrownBy(() -> jobResultStore.createDirtyResult(otherEntryWithSameJobId))
                   .isInstanceOf(IllegalStateException.class);
   ```
   That way, we make it more explicit that we're actually checking on the JobID and not only on the entire instance (the test method name need to be aligned as well).

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/**
+ * This interface defines a series of tests for any implementation of the {@link JobResultStore} to
+ * determine whether they correctly implement the contracts defined by the interface.
+ */
+public interface JobResultStoreContractTest {
+
+    JobResultEntry DUMMY_JOB_RESULT_ENTRY =
+            new JobResultEntry(TestingJobResultStore.DUMMY_JOB_RESULT);
+
+    JobResultStore createJobResultStore() throws IOException;
+
+    @Test
+    default void testStoreDirtyJobResult() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }
+
+    @Test
+    default void testStoreDuplicateJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    default void testStoreCleanedJobResultThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        assertThatThrownBy(() -> jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    default void testMarkDirtyResultAsClean() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+    }
+
+    @Test
+    default void testCleaningDuplicateEntryThrowsNoException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        assertThatNoException()
+                .isThrownBy(
+                        () -> jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+    }
+
+    @Test
+    default void testCleaningNonExistentEntryThrowsException() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        assertThatThrownBy(
+                        () -> jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isInstanceOf(NoSuchElementException.class);
+    }
+
+    @Test
+    default void testHasJobResultEntryWithDirtyEntry() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+        assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isFalse();
+        assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+    }
+
+    @Test
+    default void testHasJobResultEntryWithCleanEntry() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isFalse();
+        assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+                .isTrue();
+        assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+    }
+
+    @Test
+    default void testHasJobResultEntryWithEmptyStore() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        JobID jobId = new JobID();
+        assertThat(jobResultStore.hasDirtyJobResultEntry(jobId)).isFalse();
+        assertThat(jobResultStore.hasCleanJobResultEntry(jobId)).isFalse();
+        assertThat(jobResultStore.hasJobResultEntry(jobId)).isFalse();
+    }
+
+    @Test
+    default void testGetDirtyResultsWithNoEntry() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        assertThat(jobResultStore.getDirtyResults()).isEmpty();
+    }
+
+    @Test
+    default void testGetDirtyResultsWithDirtyEntry() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        assertThat(
+                        jobResultStore.getDirtyResults().stream()
+                                .map(JobResult::getJobId)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(DUMMY_JOB_RESULT_ENTRY.getJobId());
+    }
+
+    @Test
+    default void testGetDirtyResultsWithDirtyAndCleanEntry() throws IOException {
+        JobResultStore jobResultStore = createJobResultStore();
+        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+
+        final JobResultEntry otherDirtyJobResultEntry =
+                new JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(new JobID()));
+        jobResultStore.createDirtyResult(otherDirtyJobResultEntry);
+
+        assertThat(
+                        jobResultStore.getDirtyResults().stream()
+                                .map(JobResult::getJobId)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(otherDirtyJobResultEntry.getJobId());

Review comment:
       ```suggestion
           assertThat(jobResultStore.getDirtyResults())
                   .singleElement()
                   .isEqualTo(otherDirtyJobResultEntry.getJobId());
   ```
   It does the same. Feel free to use whichever version you like. I just wanted to show you `.singleElement()` assertj method. I just learned about it recently and find it quite handy in situations like that. that would apply to the test method above as well.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** An abstract class for threadsafe implementations of the {@link JobResultStore}. */
+public abstract class AbstractThreadsafeJobResultStore implements JobResultStore {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractThreadsafeJobResultStore.class);
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    @Override
+    public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException {
+        Preconditions.checkState(
+                !hasJobResultEntry(jobResultEntry.getJobId()),
+                "Job result store already contains an entry for for job %s",
+                jobResultEntry.getJobId());
+
+        readWriteLock.writeLock().lock();
+        try {
+            createDirtyResultInternal(jobResultEntry);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract void createDirtyResultInternal(JobResultEntry jobResultEntry)
+            throws IOException;
+
+    @Override
+    public void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException {
+        if (hasCleanJobResultEntry(jobId)) {
+            LOG.debug("The job {} is already marked as clean. No action required.", jobId);
+            return;
+        }
+
+        readWriteLock.writeLock().lock();
+        try {
+            markResultAsCleanInternal(jobId);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract void markResultAsCleanInternal(JobID jobId)
+            throws IOException, NoSuchElementException;
+
+    @Override
+    public boolean hasJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasDirtyJobResultEntryInternal(jobId) || hasCleanJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasDirtyJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException;
+
+    @Override
+    public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return hasCleanJobResultEntryInternal(jobId);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException;
+
+    @Override
+    public Set<JobResult> getDirtyResults() throws IOException {
+        readWriteLock.readLock().lock();
+        try {
+            return getDirtyResultsInternal();
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract Set<JobResult> getDirtyResultsInternal() throws IOException;

Review comment:
       That allows something like 
   ```
       @Override
       public boolean hasJobResultEntry(JobID jobId) throws IOException {
           return withReadLock(
                   () ->
                           hasDirtyJobResultEntryInternal(jobId)
                                   || hasCleanJobResultEntryInternal(jobId));
       }
   ```
   with no additional try catch being necessary...

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testutils.TestingJobResultStore;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the internal {@link FileSystemJobResultStore} mechanisms. */
+@ExtendWith(TestLoggerExtension.class)
+public class FileSystemJobResultStoreTestInternal {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private static final JobResultEntry DUMMY_JOB_RESULT_ENTRY =

Review comment:
       Same here, we could reuse `JobResultStoreContractTest.DUMMY_JOB_RESULT_ENTRY`. WDYT?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
##########
@@ -20,112 +20,57 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore;
 import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /** A thread-safe in-memory implementation of the {@link JobResultStore}. */
-public class EmbeddedJobResultStore implements JobResultStore {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedJobResultStore.class);
-
-    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+public class EmbeddedJobResultStore extends AbstractThreadsafeJobResultStore {
 
-    @GuardedBy("readWriteLock")
-    @VisibleForTesting
-    final Map<JobID, JobResultEntry> dirtyJobResults = new HashMap<>();
+    @VisibleForTesting final Map<JobID, JobResultEntry> dirtyJobResults = new HashMap<>();
 
-    @GuardedBy("readWriteLock")
-    @VisibleForTesting
-    final Map<JobID, JobResultEntry> cleanJobResults = new HashMap<>();
+    @VisibleForTesting final Map<JobID, JobResultEntry> cleanJobResults = new HashMap<>();

Review comment:
       See my comment in `EmbeddedJobResultStoreInternal`. I guess, we can switch to `private` here without the `@VisibleForTesting` annotation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org