You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/07 14:21:22 UTC

[GitHub] [flink] dmvk commented on a change in pull request #18189: [FLINK-25430] Replace RunningJobRegistry by JobResultStore

dmvk commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r780210722



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
##########
@@ -59,20 +72,25 @@ public int processExitCode() {
      * #UNKNOWN}.
      */
     public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
-        if (jobStatus == null) {
+        if (jobStatus == null || !JOB_STATUS_APPLICATION_STATUS_BI_MAP.containsKey(jobStatus)) {

Review comment:
       ```suggestion
           return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus, UNKNOWN);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java
##########
@@ -49,7 +50,9 @@ protected void onStart() {
                 dispatcherGatewayServiceFactory.create(
                         DispatcherId.fromUuid(getLeaderSessionId()),
                         Collections.singleton(jobGraph),
-                        ThrowingJobGraphWriter.INSTANCE);
+                        Collections.emptyList(),

Review comment:
       Same as with the MiniDispatcher, how should the JRS work in the context of the per job mode?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -196,6 +198,8 @@ public Dispatcher(
 
         this.recoveredJobs = new HashSet<>(recoveredJobs);
 
+        this.globallyTerminatedJobs = new HashSet<>(globallyTerminatedJobs);

Review comment:
       Should we wrap both of these in `Collections.unmodifiableSet`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
##########
@@ -262,7 +272,17 @@ private DispatcherRunner createDispatcherRunner() throws Exception {
         return dispatcherRunnerFactory.createDispatcherRunner(
                 dispatcherLeaderElectionService,
                 fatalErrorHandler,
-                () -> jobGraphStore,
+                new JobPersistenceComponentFactory() {

Review comment:
       Should we have a testing implementation that allows you to pass the stores via constructor instead? (similar to `TestingCheckpointRecoveryFactory`)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -731,16 +741,14 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         jobGraphStore.start(null);
         haServices.setJobGraphStore(jobGraphStore);
 
-        // Track cleanup - running jobs registry
-        haServices.setRunningJobsRegistry(
-                new StandaloneRunningJobsRegistry() {
-
-                    @Override
-                    public void clearJob(JobID jobID) {
-                        super.clearJob(jobID);
-                        cleanUpEvents.add(CLEANUP_RUNNING_JOBS_REGISTRY);
-                    }
-                });
+        // Track cleanup - job result store
+        haServices.setJobResultStore(
+                TestingJobResultStore.builder()
+                        .withMarkResultAsCleanConsumer(
+                                (jobID -> {

Review comment:
       brackets

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
##########
@@ -130,11 +130,11 @@ public JobGraphStore getJobGraphStore() throws Exception {
     }
 
     @Override
-    public RunningJobsRegistry getRunningJobsRegistry() {
-        if (runningJobsRegistry == null) {
-            this.runningJobsRegistry = createRunningJobsRegistry();
+    public JobResultStore getJobResultStore() throws Exception {
+        if (jobResultStore == null) {

Review comment:
       do we need the lazy initialization here? This is the only mutable field in the implementation

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
##########
@@ -63,6 +63,7 @@ public MiniDispatcher(
                 rpcService,
                 fencingToken,
                 Collections.singleton(jobGraph),
+                Collections.emptyList(),

Review comment:
       Is this correct? Could this break the intended behavior for the per job mode?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
##########
@@ -92,34 +99,61 @@ private void startServices() {
         }
     }
 
-    private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
-        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
+    private void createDispatcherIfRunning(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> globallyTerminatedJobs) {
+        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, globallyTerminatedJobs));
     }
 
-    private void createDispatcher(Collection<JobGraph> jobGraphs) {
+    private void createDispatcher(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> globallyTerminatedJobs) {
 
         final DispatcherGatewayService dispatcherService =
                 dispatcherGatewayServiceFactory.create(
-                        DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);
+                        DispatcherId.fromUuid(getLeaderSessionId()),
+                        jobGraphs,
+                        globallyTerminatedJobs,
+                        jobGraphStore,
+                        jobResultStore);
 
         completeDispatcherSetup(dispatcherService);
     }
 
-    private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
-        return CompletableFuture.supplyAsync(this::recoverJobsIfRunning, ioExecutor);
+    private CompletableFuture<Void>
+            createDispatcherBasedOnRecoveredJobGraphsAndGloballyTerminatedJobs() {
+        return CompletableFuture.supplyAsync(
+                        this::getGloballyCompletedJobResultsIfRunning, ioExecutor)
+                .thenCompose(
+                        globallyTerminatedJobs ->
+                                CompletableFuture.supplyAsync(
+                                                () ->
+                                                        this.recoverJobsIfRunning(
+                                                                globallyTerminatedJobs.stream()
+                                                                        .map(JobResult::getJobId)
+                                                                        .collect(
+                                                                                Collectors
+                                                                                        .toSet())),
+                                                ioExecutor)
+                                        .thenAccept(
+                                                jobGraphs ->
+                                                        createDispatcherIfRunning(
+                                                                jobGraphs, globallyTerminatedJobs))
+                                        .handle(this::onErrorIfRunning));

Review comment:
       ```suggestion
           final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
                   CompletableFuture.supplyAsync(
                           this::getGloballyCompletedJobResultsIfRunning, ioExecutor);
           return dirtyJobsFuture
                   .thenApplyAsync(
                           dirtyJobs ->
                                   recoverJobsIfRunning(
                                           dirtyJobs.stream()
                                                   .map(JobResult::getJobId)
                                                   .collect(Collectors.toSet())),
                           ioExecutor)
                   .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
                   .handle(this::onErrorIfRunning);
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/util/function/QuintFunctionWithException.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * Function which takes five arguments.
+ *
+ * @param <S> type of the first argument
+ * @param <T> type of the second argument
+ * @param <U> type of the third argument
+ * @param <V> type of the four argument
+ * @param <W> type of the five argument
+ * @param <R> type of the return value
+ * @param <E> type of the thrown exception
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface QuintFunctionWithException<S, T, U, V, W, R, E extends Throwable> {

Review comment:
       Same as for the `QuintFunction`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -868,6 +867,14 @@ private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) {
         blobServer.cleanupJob(jobId, jobGraphRemoved);
     }
 
+    private void markJobAsClean(JobID jobId) {
+        try {
+            jobResultStore.markResultAsClean(jobId);
+        } catch (IOException e) {
+            log.warn("Could not properly mark job {} result as clean.", jobId, e);

Review comment:
       this is going to be addressed in the upcoming PR right?

##########
File path: flink-core/src/main/java/org/apache/flink/util/function/QuintFunction.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Function which takes three arguments.
+ *
+ * @param <S> type of the first argument
+ * @param <T> type of the second argument
+ * @param <U> type of the third argument
+ * @param <V> type of the fourth argument
+ * @param <W> type of the fifth argument
+ * @param <R> type of the return value
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface QuintFunction<S, T, U, V, W, R> {

Review comment:
       I know that this is used for injecting functions to test instances, but I think with five parameters, implementations are just to complex without parameter names.
   
   In this case the function is no different from `DispatcherGatewayServiceFactory` that it "mocks". Wouldn't reusing the original interfaces be sufficient / simpler here?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
##########
@@ -37,6 +40,16 @@
 
     // ------------------------------------------------------------------------
 
+    private static final BiMap<JobStatus, ApplicationStatus> JOB_STATUS_APPLICATION_STATUS_BI_MAP =
+            HashBiMap.create();

Review comment:
       ```suggestion
               EnumBiMap.create(JobStatus.class, ApplicationStatus.class);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.JobResultEntry;
+import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * An implementation of the {@link JobResultStore} which only persists the data to an in-memory map.
+ */
+public class EmbeddedJobResultStore implements JobResultStore {
+
+    private final Map<JobID, JobResultEntry> inMemoryMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void createDirtyResult(JobResult jobResult) {
+        final JobResultEntry jobResultEntry = JobResultEntry.createDirtyJobResultEntry(jobResult);
+        inMemoryMap.put(jobResult.getJobId(), jobResultEntry);
+    }
+
+    @Override
+    public void markResultAsClean(JobID jobId) throws NoSuchElementException {

Review comment:
       If I remember correctly, we've agreed that the implementation should be thread safe. This method might need to be changed to reflect that.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultEntry.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An entry in a {@link JobResultStore} that couples a completed {@link JobResult} to a state that
+ * represents whether the resources of that JobResult have been finalized ({@link
+ * JobResultState#CLEAN}) or have yet to be finalized ({@link JobResultState#DIRTY}).
+ */
+public class JobResultEntry {
+
+    private final JobResult jobResult;
+    private JobResultState state;

Review comment:
       Can we make this class immutable to avoid possible concurrency problems? (eg. having method `toClean()` that returns a new clean instance.
   
   For example this now leads to incorrect implementation in the `EmbeddedJobResultStore`, which was supposed to be thread safe.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -538,56 +552,50 @@ public void testDispatcherTerminationWaitsForJobMasterTerminations() throws Exce
         dispatcherTerminationFuture.get();
     }
 
-    private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
+    private static final class SingleJobResultStore implements JobResultStore {
 
         @Nonnull private final JobID expectedJobId;

Review comment:
        we should get rid of `@Nonnull`, as we're implicitly treating all references as non null, unless marked otherwise

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -156,6 +158,7 @@ public Dispatcher(
             RpcService rpcService,
             DispatcherId fencingToken,
             Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> globallyTerminatedJobs,

Review comment:
       For now, this is unused, I assume that we'll use this only for recovery (we'll only query the collection). Maybe something along the lines of `recoveredDirtyJobs` would be a better name?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStoreTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.highavailability.JobResultEntry;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link EmbeddedJobResultStore}. */
+public class EmbeddedJobResultStoreTest extends TestLogger {

Review comment:
       nit: junit5 + assertj

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
##########
@@ -65,7 +68,14 @@ public void onStart() throws Exception {
     }
 
     void completeJobExecution(ExecutionGraphInfo executionGraphInfo) {
-        runAsync(() -> jobReachedTerminalState(executionGraphInfo));
+        runAsync(
+                () -> {
+                    try {
+                        jobReachedTerminalState(executionGraphInfo);
+                    } catch (Exception e) {
+                        e.printStackTrace();

Review comment:
       Leftover?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -935,6 +942,19 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr
 
         archiveExecutionGraph(executionGraphInfo);
 
+        if (terminalJobStatus.isGloballyTerminalState()) {
+            try {
+                jobResultStore.createDirtyResult(
+                        JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
+            } catch (IOException e) {
+                log.error(

Review comment:
       Do we need a proper error handling here?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java
##########
@@ -18,13 +18,22 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-/** Factory for {@link JobGraphStore}. */
-public interface JobGraphStoreFactory {
+import org.apache.flink.runtime.highavailability.JobResultStore;
+
+/** Factory for components that are responsible for persisting a job for recovery. */
+public interface JobPersistenceComponentFactory {
 
     /**
      * Creates a {@link JobGraphStore}.
      *
      * @return a {@link JobGraphStore} instance
      */
-    JobGraphStore create();
+    JobGraphStore createJobGraphStore();
+
+    /**
+     * Creates {@link JobResultStore} instances.
+     *
+     * @return {@code JobResultStore} instances.

Review comment:
       nit:
   ```suggestion
        * @return a {@link JobResultStore} instances.
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
##########
@@ -244,11 +244,13 @@ public void cleanupJobData(JobID jobID) throws Exception {
     protected abstract JobGraphStore createJobGraphStore() throws Exception;
 
     /**
-     * Create the registry that holds information about whether jobs are currently running.
+     * Create the store that holds completed job results.
      *
-     * @return Running job registry to retrieve running jobs
+     * @return Job result store to retrieve completed jobs
      */
-    protected abstract RunningJobsRegistry createRunningJobsRegistry();
+    protected JobResultStore createJobResultStore() {
+        return new EmbeddedJobResultStore();

Review comment:
       should we leave this method abstract just to force the the implementations?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -211,7 +214,14 @@ public void testJobSubmission() throws Exception {
 
     @Test
     public void testDuplicateJobSubmissionWithGloballyTerminatedJobId() throws Exception {
-        haServices.getRunningJobsRegistry().setJobFinished(jobGraph.getJobID());
+        JobResult jobResult =

Review comment:
       nit
   ```suggestion
           final JobResult jobResult =
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
##########
@@ -235,8 +235,7 @@ private void runCleanupTestWithJob(
             final LeaderElectionService jobManagerLeaderElectionService =
                     zooKeeperHaServices.getJobManagerLeaderElectionService(jobId);
 
-            final RunningJobsRegistry runningJobsRegistry =
-                    zooKeeperHaServices.getRunningJobsRegistry();
+            final JobResultStore jobResultStore = zooKeeperHaServices.getJobResultStore();

Review comment:
       unused, should we also incorporate dirty -> clean into the lifecycle?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
##########
@@ -662,15 +663,21 @@ public void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture(
 
     @Test
     public void testJobAlreadyDone() throws Exception {
-        JobID jobID = new JobID();
+        JobID jobId = new JobID();
+        JobResult jobResult =

Review comment:
       nit
   ```suggestion
           final JobID jobId = new JobID();
           final JobResult jobResult =
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
##########
@@ -160,7 +168,7 @@ public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
         dispatcherServiceFactory =
                 TestingDispatcherServiceFactory.newBuilder()
                         .setCreateFunction(
-                                (ignoredA, ignoredB, ignoredC) ->
+                                (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) ->

Review comment:
       see comments on `QuintFunction`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
##########
@@ -180,7 +179,17 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception {
                     createDispatcherRunner(
                             rpcService,
                             dispatcherLeaderElectionService,
-                            () -> createZooKeeperJobGraphStore(client),
+                            new JobPersistenceComponentFactory() {

Review comment:
       again, `new TestingJobPersistenceComponentFactory(createZooKeeperJobGraphStore(client), new EmbeddedJobResultStore())` might be nice to have

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.JobResultEntry;
+import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * An implementation of the {@link JobResultStore} which only persists the data to an in-memory map.
+ */
+public class EmbeddedJobResultStore implements JobResultStore {
+
+    private final Map<JobID, JobResultEntry> inMemoryMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void createDirtyResult(JobResult jobResult) {
+        final JobResultEntry jobResultEntry = JobResultEntry.createDirtyJobResultEntry(jobResult);
+        inMemoryMap.put(jobResult.getJobId(), jobResultEntry);
+    }
+
+    @Override
+    public void markResultAsClean(JobID jobId) throws NoSuchElementException {

Review comment:
       If that's correct, we may want to add a not about thread safety to the javadoc of the JRS interface.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultEntry.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An entry in a {@link JobResultStore} that couples a completed {@link JobResult} to a state that
+ * represents whether the resources of that JobResult have been finalized ({@link
+ * JobResultState#CLEAN}) or have yet to be finalized ({@link JobResultState#DIRTY}).
+ */
+public class JobResultEntry {
+
+    private final JobResult jobResult;
+    private JobResultState state;

Review comment:
       In general I'd really try to avoid mutable classes where possible, so we don't have to think too much about thread safety in the future.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.NoSuchElementException;
+
+/**
+ * A persistent storage mechanism for the results of successfully and unsuccessfully completed jobs.
+ */
+public interface JobResultStore {
+
+    /**
+     * Create a job result of a completed job. The initial state of a job result is always marked as
+     * DIRTY, which indicates that clean-up operations still need to be performed. Once the job
+     * resource cleanup has been finalized, we can "commit" the job result as a CLEAN result using
+     * {@link #markResultAsClean(JobID)}.
+     *
+     * @param jobResult The job result we wish to persist.
+     * @throws IOException if the creation of the dirty result failed for IO reasons.
+     */
+    void createDirtyResult(JobResult jobResult) throws IOException;
+
+    /**
+     * Marks an existing job result as CLEAN. This indicates that no more resource cleanup steps
+     * need to be performed.
+     *
+     * @param jobId Ident of the job we wish to mark as clean.
+     * @throws IOException if marking the dirty result as cleaned failed for IO reasons.
+     * @throws NoSuchElementException if there is no corresponding dirty job present in the store
+     *     for the given JobID.
+     */
+    void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException;
+
+    /**
+     * Returns whether the store already contains an entry for a job.
+     *
+     * @param jobId Ident of the job we wish to check the store for.
+     * @return A boolean for whether the job result store contains an entry for the given {@link
+     *     JobID}
+     * @throws IOException if determining whether a job entry is present in the store failed for IO
+     *     reasons.
+     */
+    boolean hasJobResultEntry(JobID jobId) throws IOException;
+
+    /**
+     * Get all persisted {@link JobResult job results} that are marked as dirty. This is useful for
+     * recovery of finalization steps.
+     *
+     * @return A collection of dirty JobResults from the store.
+     * @throws IOException if collecting the set of dirty results failed for IO reasons.
+     */
+    Collection<JobResult> getDirtyResults() throws IOException;

Review comment:
       as @autophagy has pointed out, do we want to return a set here instead, to be explicit about the no-duplicate property of the returned value?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org