You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/01/18 16:05:42 UTC

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add SerializableExecutionGrap...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/5310

    [FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher

    ## What is the purpose of the change
    
    The SerializableExecutionGraphStore is responsible for storing completed jobs
    for historic job requests (e.g. from the web ui or from the client). The store
    is populated by the Dispatcher once a job has terminated.
    
    The FileSerializableExecutionGraphStore implementation persists all
    SerializableExecutionGraphs on disk in order to avoid OOM problems. It only keeps
    some of the stored graphs in memory until it reaches a configurable size. Once
    coming close to this size, it will evict the elements and only reload them if
    requested again. Additionally, the FileSerializableExecutionGraphStore defines
    an expiration time after which the execution graphs will be removed from disk.
    This prevents excessive use of disk resources.
    
    This PR is based on #5309.
    
    ## Brief change log
    
    - Introduce `SerializableExecutionGraphStore` and `FileSerializableExecutionGraphStore`
    - Add `FileSerializableExecutionGraphStore` to `Dispatcher`
    - Store `SerializableExecutionGraphs` in corresponding `FileSerializableExecutionGraphStore`
    - Adapt `Dispatcher` to serve requests for historic jobs
    
    ## Verifying this change
    
    - Added `FileSerializableExecutionGraphStoreTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    
    cc @GJL 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink addHistoricJobView

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5310
    
----
commit a959b9411833e320065b328ed2fc936b58f911f4
Author: Till Rohrmann <tr...@...>
Date:   2018-01-16T17:45:53Z

    [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph
    
    This commit introduces the SerializableExecutionGraph which extends the
    AccessExecutionGraph and adds serializability to it. Moreover, this commit
    changes the OnCompletionActions interface such that it accepts a
    SerializableExecutionGraph instead of a plain JobResult. This allows to
    archive the completed ExecutionGraph for further usage in the container
    component of the JobMasterRunner.

commit ca15b076c05ff940a12a240ba385e2434f93790b
Author: Till Rohrmann <tr...@...>
Date:   2018-01-18T14:02:36Z

    [hotfix] [tests] Let BucketingSink extend TestLogger

commit 21c25502fb6d07c6fb65f18100dc6d4ec23e9d93
Author: Till Rohrmann <tr...@...>
Date:   2018-01-17T14:01:57Z

    [FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#requestJob type safe
    
    Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a
    CompletableFuture<SerializableExecutionGraph> instead of a
    CompletableFuture<AccessExecutionGraph>. In order to support the old code
    and the JobManagerGateway implementation we have to keep the return type
    in RestfulGateway. Once the old code has been removed, we should change
    this as well.

commit 7b7b0692582189b8e540e5ae022d351c45991e43
Author: Till Rohrmann <tr...@...>
Date:   2018-01-17T11:22:43Z

    [FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher
    
    The SerializableExecutionGraphStore is responsible for storing completed jobs
    for historic job requests (e.g. from the web ui or from the client). The store
    is populated by the Dispatcher once a job has terminated.
    
    The FileSerializableExecutionGraphStore implementation persists all
    SerializableExecutionGraphs on disk in order to avoid OOM problems. It only keeps
    some of the stored graphs in memory until it reaches a configurable size. Once
    coming close to this size, it will evict the elements and only reload them if
    requested again. Additionally, the FileSerializableExecutionGraphStore defines
    an expiration time after which the execution graphs will be removed from disk.
    This prevents excessive use of disk resources.

----


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163922121
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    +
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@BeforeClass
    +	public static void setup() {
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
    +	}
    +
    +	/**
    +	 * Tests that we can put {@link ArchivedExecutionGraph} into the
    +	 * {@link FileArchivedExecutionGraphStore} and that the graph is persisted.
    +	 */
    +	@Test
    +	public void testPut() throws IOException {
    +		final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +
    +			final File storageDirectory = executionGraphStore.getStorageDir();
    +
    +			// check that the storage directory is empty
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
    +
    +			executionGraphStore.put(dummyExecutionGraph);
    +
    +			// check that we have persisted the given execution graph
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
    +
    +			assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that null is returned if we request an unknown JobID.
    +	 */
    +	@Test
    +	public void testUnknownGet() throws IOException {
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
    +		}
    +	}
    +
    +	/**
    +	 * Tests that we obtain the correct jobs overview.
    +	 */
    +	@Test
    +	public void testStoredJobsOverview() throws IOException {
    +		final int numberExecutionGraphs = 10;
    +		final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
    +
    +		final List<JobStatus> jobStatuses = executionGraphs.stream().map(ArchivedExecutionGraph::getState).collect(Collectors.toList());
    +
    +		final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
    +
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			for (ArchivedExecutionGraph executionGraph : executionGraphs) {
    +				executionGraphStore.put(executionGraph);
    +			}
    +
    +			assertThat(executionGraphStore.getStoredJobsOverview(), Matchers.equalTo(expectedJobsOverview));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that we obtain the correct collection of available job details.
    +	 */
    +	@Test
    +	public void testAvailableJobDetails() throws IOException {
    +		final int numberExecutionGraphs = 10;
    +		final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
    +
    +		final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
    +
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			for (ArchivedExecutionGraph executionGraph : executionGraphs) {
    +				executionGraphStore.put(executionGraph);
    +			}
    +
    +			assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that an expired execution graph is removed from the execution graph store.
    +	 */
    +	@Test
    +	public void testExecutionGraphExpiration() throws Exception {
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		final Time expirationTime = Time.milliseconds(1L);
    +
    +		final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = new FileArchivedExecutionGraphStore(
    +			rootDir,
    +			expirationTime,
    +			10000L,
    +			scheduledExecutor)) {
    +
    +			final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
    +
    +			executionGraphStore.put(executionGraph);
    +
    +			// there should one execution graph
    +			assertThat(executionGraphStore.size(), Matchers.equalTo(1));
    +
    +			Thread.sleep(expirationTime.toMilliseconds());
    --- End diff --
    
    Isn't it possible to use a `Ticker` to test the time based eviction? https://github.com/google/guava/wiki/CachesExplained#testing-timed-eviction


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163919241
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    +
    +	private static final Random RANDOM = new Random();
    --- End diff --
    
    With `ThreadLocalRandom.current().nextInt(...)` you already have an available random instance which does not suffer from lock contention problems.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164058010
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    --- End diff --
    
    You're right, will change it.


---

[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5310
  
    👍 


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163915273
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.blob.BlobUtils;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
    + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
    + * the stored execution graphs are periodically cleaned up.
    + */
    +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
    +
    +	private final File storageDir;
    +
    +	private final Cache<JobID, JobDetails> jobDetailsCache;
    +
    +	private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
    +
    +	private final ScheduledFuture<?> cleanupFuture;
    +
    +	private final Thread shutdownHook;
    +
    +	private int numFinishedJobs;
    +
    +	private int numFailedJobs;
    +
    +	private int numCanceledJobs;
    +
    +	public FileArchivedExecutionGraphStore(
    +			File rootDir,
    +			Time expirationTime,
    +			long maximumCacheSizeBytes,
    +			ScheduledExecutor scheduledExecutor) throws IOException {
    +
    +		final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
    +
    +		LOG.info(
    +			"Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
    +			FileArchivedExecutionGraphStore.class.getSimpleName(),
    +			storageDirectory,
    +			expirationTime.toMilliseconds(),
    +			maximumCacheSizeBytes);
    +
    +		this.storageDir = Preconditions.checkNotNull(storageDirectory);
    +		Preconditions.checkArgument(
    +			storageDirectory.exists() && storageDirectory.isDirectory(),
    +			"The storage directory must exist and be a directory.");
    +		this.jobDetailsCache = CacheBuilder.newBuilder()
    +			.expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
    +			.removalListener(
    +				(RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
    +			.build();
    +
    +		this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
    +			.maximumWeight(maximumCacheSizeBytes)
    +			.weigher(this::calculateSize)
    +			.build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
    +				@Override
    +				public ArchivedExecutionGraph load(JobID jobId) throws Exception {
    +					return loadExecutionGraph(jobId);
    +				}});
    +
    +		this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
    +			jobDetailsCache::cleanUp,
    +			expirationTime.toMilliseconds(),
    +			expirationTime.toMilliseconds(),
    +			TimeUnit.MILLISECONDS);
    +
    +		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +
    +		this.numFinishedJobs = 0;
    +		this.numFailedJobs = 0;
    +		this.numCanceledJobs = 0;
    +	}
    +
    +	@Override
    +	public int size() {
    +		return Math.toIntExact(jobDetailsCache.size());
    +	}
    +
    +	@Override
    +	@Nullable
    +	public ArchivedExecutionGraph get(JobID jobId) {
    +		try {
    +			return archivedExecutionGraphCache.get(jobId);
    +		} catch (ExecutionException e) {
    +			LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final JobStatus jobStatus = archivedExecutionGraph.getState();
    +		final JobID jobId = archivedExecutionGraph.getJobID();
    +		final String jobName = archivedExecutionGraph.getJobName();
    +
    +		Preconditions.checkArgument(
    +			jobStatus.isGloballyTerminalState(),
    +			"The job " + jobName + '(' + jobId +
    +				") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
    +
    +		switch (jobStatus) {
    +			case FINISHED:
    +				numFinishedJobs++;
    +				break;
    +			case CANCELED:
    +				numCanceledJobs++;
    +				break;
    +			case FAILED:
    +				numFailedJobs++;
    +				break;
    +			default:
    +				throw new IllegalStateException("The job " + jobName + '(' +
    +					jobId + ") should have been in a globally terminal state. " +
    +					"Instead it was in state " + jobStatus + '.');
    +		}
    +
    +		// write the ArchivedExecutionGraph to disk
    +		storeArchivedExecutionGraph(archivedExecutionGraph);
    +
    +		final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
    +
    +		jobDetailsCache.put(jobId, detailsForJob);
    +		archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    +	}
    +
    +	@Override
    +	public JobsOverview getStoredJobsOverview() {
    +		return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    +	}
    +
    +	@Override
    +	public Collection<JobDetails> getAvailableJobDetails() {
    +		return jobDetailsCache.asMap().values();
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		cleanupFuture.cancel(false);
    +
    +		jobDetailsCache.invalidateAll();
    +
    +		// clean up the storage directory
    +		FileUtils.deleteFileOrDirectory(storageDir);
    +
    +		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
    +		// shutdown hook itself
    +		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
    +			try {
    +				Runtime.getRuntime().removeShutdownHook(shutdownHook);
    +			}
    +			catch (IllegalStateException e) {
    +				// race, JVM is in shutdown already, we can safely ignore this
    +			}
    +			catch (Throwable t) {
    +				LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------
    +	// Internal methods
    +	// --------------------------------------------------------------
    +
    +	private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			return Math.toIntExact(archivedExecutionGraphFile.length());
    +		} else {
    +			LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
    +			return serializableExecutionGraph.getAllVertices().size() * 1000 +
    +				serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
    +		}
    +	}
    +
    +	private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
    +				return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
    +			}
    +		} else {
    +			throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
    +				". This indicates that the file either has been deleted or never written.");
    +		}
    +	}
    +
    +	private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());
    +
    +		try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
    +			InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
    +		}
    +	}
    +
    +	private File getExecutionGraphFile(JobID jobId) {
    +		return new File(storageDir, jobId.toString());
    +	}
    +
    +	private void deleteExecutionGraphFile(JobID jobId) {
    +		Preconditions.checkNotNull(jobId);
    +
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		try {
    +			FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
    +		} catch (IOException e) {
    +			LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
    +		}
    +
    +		archivedExecutionGraphCache.invalidate(jobId);
    +		jobDetailsCache.invalidate(jobId);
    +	}
    +
    +	private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
    +		final int maxAttempts = 10;
    +
    +		for (int attempt = 0; attempt < maxAttempts; attempt++) {
    --- End diff --
    
    Why do you expect it to fail sometimes?


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164058994
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    +
    +	private static final Random RANDOM = new Random();
    --- End diff --
    
    Good point. Will change it.


---

[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5310
  
    Is it acceptable behavior that sometimes Graphs don't get deleted from disk?


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163921620
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Iterator;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.Delayed;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Simple {@link ScheduledExecutor} implementation for testing purposes.
    + */
    +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
    +
    +	private final ConcurrentLinkedQueue<ScheduledTask<?>> scheduledTasks = new ConcurrentLinkedQueue<>();
    +
    +	@Override
    +	public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    +		return insertRunnable(command, false);
    +	}
    +
    +	@Override
    +	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    +		final ScheduledTask<V> scheduledTask = new ScheduledTask<>(callable, false);
    +
    +		scheduledTasks.offer(scheduledTask);
    +
    +		return scheduledTask;
    +	}
    +
    +	@Override
    +	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    +		return insertRunnable(command, true);
    +	}
    +
    +	@Override
    +	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    +		return insertRunnable(command, true);
    +	}
    +
    +	/**
    +	 * Triggers all registered tasks.
    +	 */
    +	public void triggerScheduledTasks() {
    +		final Iterator<ScheduledTask<?>> iterator = scheduledTasks.iterator();
    +
    +		while (iterator.hasNext()) {
    +			final ScheduledTask<?> scheduledTask = iterator.next();
    +
    +			scheduledTask.execute();
    +
    +			if (!scheduledTask.isPeriodic) {
    +				iterator.remove();
    +			}
    +		}
    +	}
    +
    +	private ScheduledFuture<?> insertRunnable(Runnable command, boolean isPeriodic) {
    +		final ScheduledTask<?> scheduledTask = new ScheduledTask<>(
    +			() -> {
    +				command.run();
    +				return null;
    +			},
    +			isPeriodic);
    +
    +		scheduledTasks.offer(scheduledTask);
    +
    +		return scheduledTask;
    +	}
    +
    +	private static final class ScheduledTask<T> implements ScheduledFuture<T> {
    +
    +		private final Callable<T> callable;
    +
    +		private final boolean isPeriodic;
    +
    +		private final CompletableFuture<T> result;
    +
    +		private ScheduledTask(Callable<T> callable, boolean isPeriodic) {
    +			this.callable = Preconditions.checkNotNull(callable);
    +			this.isPeriodic = isPeriodic;
    +
    +			this.result = new CompletableFuture<>();
    +		}
    +
    +		public boolean isPeriodic() {
    --- End diff --
    
    nit: method is unused


---

[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5310
  
    Thanks for the review @GJL. Ideally we don't leave things around when no longer needed. Thus, which scenario other than a hard crash have you spotted that doesn't clean up the graphs?


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5310


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164062610
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    +
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@BeforeClass
    +	public static void setup() {
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
    +	}
    +
    +	/**
    +	 * Tests that we can put {@link ArchivedExecutionGraph} into the
    +	 * {@link FileArchivedExecutionGraphStore} and that the graph is persisted.
    +	 */
    +	@Test
    +	public void testPut() throws IOException {
    +		final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +
    +			final File storageDirectory = executionGraphStore.getStorageDir();
    +
    +			// check that the storage directory is empty
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
    +
    +			executionGraphStore.put(dummyExecutionGraph);
    +
    +			// check that we have persisted the given execution graph
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
    +
    +			assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that null is returned if we request an unknown JobID.
    +	 */
    +	@Test
    +	public void testUnknownGet() throws IOException {
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
    +		}
    +	}
    +
    +	/**
    +	 * Tests that we obtain the correct jobs overview.
    +	 */
    +	@Test
    +	public void testStoredJobsOverview() throws IOException {
    +		final int numberExecutionGraphs = 10;
    +		final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
    +
    +		final List<JobStatus> jobStatuses = executionGraphs.stream().map(ArchivedExecutionGraph::getState).collect(Collectors.toList());
    +
    +		final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
    +
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			for (ArchivedExecutionGraph executionGraph : executionGraphs) {
    +				executionGraphStore.put(executionGraph);
    +			}
    +
    +			assertThat(executionGraphStore.getStoredJobsOverview(), Matchers.equalTo(expectedJobsOverview));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that we obtain the correct collection of available job details.
    +	 */
    +	@Test
    +	public void testAvailableJobDetails() throws IOException {
    +		final int numberExecutionGraphs = 10;
    +		final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
    +
    +		final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
    +
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +			for (ArchivedExecutionGraph executionGraph : executionGraphs) {
    +				executionGraphStore.put(executionGraph);
    +			}
    +
    +			assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
    +		}
    +	}
    +
    +	/**
    +	 * Tests that an expired execution graph is removed from the execution graph store.
    +	 */
    +	@Test
    +	public void testExecutionGraphExpiration() throws Exception {
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		final Time expirationTime = Time.milliseconds(1L);
    +
    +		final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = new FileArchivedExecutionGraphStore(
    +			rootDir,
    +			expirationTime,
    +			10000L,
    +			scheduledExecutor)) {
    +
    +			final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
    +
    +			executionGraphStore.put(executionGraph);
    +
    +			// there should one execution graph
    +			assertThat(executionGraphStore.size(), Matchers.equalTo(1));
    +
    +			Thread.sleep(expirationTime.toMilliseconds());
    --- End diff --
    
    Will change it.


---

[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5310
  
    No, I only had hard crashes in mind. One could do a directory listing and delete old files.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164057811
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.blob.BlobUtils;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
    + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
    + * the stored execution graphs are periodically cleaned up.
    + */
    +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
    +
    +	private final File storageDir;
    +
    +	private final Cache<JobID, JobDetails> jobDetailsCache;
    +
    +	private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
    +
    +	private final ScheduledFuture<?> cleanupFuture;
    +
    +	private final Thread shutdownHook;
    +
    +	private int numFinishedJobs;
    +
    +	private int numFailedJobs;
    +
    +	private int numCanceledJobs;
    +
    +	public FileArchivedExecutionGraphStore(
    +			File rootDir,
    +			Time expirationTime,
    +			long maximumCacheSizeBytes,
    +			ScheduledExecutor scheduledExecutor) throws IOException {
    +
    +		final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
    +
    +		LOG.info(
    +			"Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
    +			FileArchivedExecutionGraphStore.class.getSimpleName(),
    +			storageDirectory,
    +			expirationTime.toMilliseconds(),
    +			maximumCacheSizeBytes);
    +
    +		this.storageDir = Preconditions.checkNotNull(storageDirectory);
    +		Preconditions.checkArgument(
    +			storageDirectory.exists() && storageDirectory.isDirectory(),
    +			"The storage directory must exist and be a directory.");
    +		this.jobDetailsCache = CacheBuilder.newBuilder()
    +			.expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
    +			.removalListener(
    +				(RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
    +			.build();
    +
    +		this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
    +			.maximumWeight(maximumCacheSizeBytes)
    +			.weigher(this::calculateSize)
    +			.build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
    +				@Override
    +				public ArchivedExecutionGraph load(JobID jobId) throws Exception {
    +					return loadExecutionGraph(jobId);
    +				}});
    +
    +		this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
    +			jobDetailsCache::cleanUp,
    +			expirationTime.toMilliseconds(),
    +			expirationTime.toMilliseconds(),
    +			TimeUnit.MILLISECONDS);
    +
    +		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +
    +		this.numFinishedJobs = 0;
    +		this.numFailedJobs = 0;
    +		this.numCanceledJobs = 0;
    +	}
    +
    +	@Override
    +	public int size() {
    +		return Math.toIntExact(jobDetailsCache.size());
    +	}
    +
    +	@Override
    +	@Nullable
    +	public ArchivedExecutionGraph get(JobID jobId) {
    +		try {
    +			return archivedExecutionGraphCache.get(jobId);
    +		} catch (ExecutionException e) {
    +			LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final JobStatus jobStatus = archivedExecutionGraph.getState();
    +		final JobID jobId = archivedExecutionGraph.getJobID();
    +		final String jobName = archivedExecutionGraph.getJobName();
    +
    +		Preconditions.checkArgument(
    +			jobStatus.isGloballyTerminalState(),
    +			"The job " + jobName + '(' + jobId +
    +				") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
    +
    +		switch (jobStatus) {
    +			case FINISHED:
    +				numFinishedJobs++;
    +				break;
    +			case CANCELED:
    +				numCanceledJobs++;
    +				break;
    +			case FAILED:
    +				numFailedJobs++;
    +				break;
    +			default:
    +				throw new IllegalStateException("The job " + jobName + '(' +
    +					jobId + ") should have been in a globally terminal state. " +
    +					"Instead it was in state " + jobStatus + '.');
    +		}
    +
    +		// write the ArchivedExecutionGraph to disk
    +		storeArchivedExecutionGraph(archivedExecutionGraph);
    +
    +		final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
    +
    +		jobDetailsCache.put(jobId, detailsForJob);
    +		archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    +	}
    +
    +	@Override
    +	public JobsOverview getStoredJobsOverview() {
    +		return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    +	}
    +
    +	@Override
    +	public Collection<JobDetails> getAvailableJobDetails() {
    +		return jobDetailsCache.asMap().values();
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		cleanupFuture.cancel(false);
    +
    +		jobDetailsCache.invalidateAll();
    +
    +		// clean up the storage directory
    +		FileUtils.deleteFileOrDirectory(storageDir);
    +
    +		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
    +		// shutdown hook itself
    +		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
    +			try {
    +				Runtime.getRuntime().removeShutdownHook(shutdownHook);
    +			}
    +			catch (IllegalStateException e) {
    +				// race, JVM is in shutdown already, we can safely ignore this
    +			}
    +			catch (Throwable t) {
    +				LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------
    +	// Internal methods
    +	// --------------------------------------------------------------
    +
    +	private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			return Math.toIntExact(archivedExecutionGraphFile.length());
    +		} else {
    +			LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
    +			return serializableExecutionGraph.getAllVertices().size() * 1000 +
    +				serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
    +		}
    +	}
    +
    +	private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
    +				return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
    +			}
    +		} else {
    +			throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
    +				". This indicates that the file either has been deleted or never written.");
    +		}
    +	}
    +
    +	private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());
    +
    +		try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
    +			InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
    +		}
    +	}
    +
    +	private File getExecutionGraphFile(JobID jobId) {
    +		return new File(storageDir, jobId.toString());
    +	}
    +
    +	private void deleteExecutionGraphFile(JobID jobId) {
    +		Preconditions.checkNotNull(jobId);
    +
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		try {
    +			FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
    +		} catch (IOException e) {
    +			LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
    +		}
    +
    +		archivedExecutionGraphCache.invalidate(jobId);
    +		jobDetailsCache.invalidate(jobId);
    +	}
    +
    +	private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
    +		final int maxAttempts = 10;
    +
    +		for (int attempt = 0; attempt < maxAttempts; attempt++) {
    --- End diff --
    
    In case of collisions this will decrease a bit the probability, even though it won't be `0`. This is similar to how we do it for the `BlobServer` but strictly speaking most likely not necessary.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163911570
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
     		fatalErrorHandler.onFatalError(throwable);
     	}
     
    -	private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) {
    -		final JobResult jobResult = JobResult.createFrom(accessExecutionGraph);
    +	private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
    +		Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), "");
    --- End diff --
    
    The `errorMessage` is an empty string. Leave it out completely or put something meaningful. 


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163919921
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    +
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@BeforeClass
    +	public static void setup() {
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
    +		GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
    +	}
    +
    +	/**
    +	 * Tests that we can put {@link ArchivedExecutionGraph} into the
    +	 * {@link FileArchivedExecutionGraphStore} and that the graph is persisted.
    +	 */
    +	@Test
    +	public void testPut() throws IOException {
    +		final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
    +		final File rootDir = temporaryFolder.newFolder();
    +
    +		try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
    +
    +			final File storageDirectory = executionGraphStore.getStorageDir();
    +
    +			// check that the storage directory is empty
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
    +
    +			executionGraphStore.put(dummyExecutionGraph);
    +
    +			// check that we have persisted the given execution graph
    +			assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
    +
    +			assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
    --- End diff --
    
    It is not obvious what the matcher is doing. How about:
    `assertThat(...), isPredicateFulfilled(..))`
    
    ```
    	private static Matcher<ArchivedExecutionGraph> isPredicateFulfilled(ArchivedExecutionGraph archivedExecutionGraph) {
    		return new PartialArchivedExecutionGraphMatcher(archivedExecutionGraph);
    	}
    ```
    
    `isPredicateFulfilled` should be replaced with a better name. 


---

[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5310
  
    I've rebased onto the latest master and addressed your comments @GJL with 6eb11dd.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163918048
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +
    +import org.hamcrest.BaseMatcher;
    +import org.hamcrest.Description;
    +import org.hamcrest.Matchers;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.stream.Collectors;
    +
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for the {@link FileArchivedExecutionGraphStore}.
    + */
    +@Category(Flip6.class)
    +public class FileArchivedExecutionGraphStoreTest extends TestLogger {
    +
    +	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3);
    --- End diff --
    
    It looks like a constant, i.e., it shouldn't be mutable.
    ```
    	private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = Collections.unmodifiableList(
    		Arrays.stream(JobStatus.values())
    			.filter(JobStatus::isGloballyTerminalState)
    			.collect(Collectors.toList()));
    ```
    Using `@BeforeClass` is not idiomatic imo.



---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164056776
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
     		fatalErrorHandler.onFatalError(throwable);
     	}
     
    -	private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) {
    -		final JobResult jobResult = JobResult.createFrom(accessExecutionGraph);
    +	private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
    +		Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), "");
    --- End diff --
    
    Good catch. Will write a proper error message.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164057896
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Iterator;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.Delayed;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Simple {@link ScheduledExecutor} implementation for testing purposes.
    + */
    +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
    +
    +	private final ConcurrentLinkedQueue<ScheduledTask<?>> scheduledTasks = new ConcurrentLinkedQueue<>();
    +
    +	@Override
    +	public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    +		return insertRunnable(command, false);
    +	}
    +
    +	@Override
    +	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    +		final ScheduledTask<V> scheduledTask = new ScheduledTask<>(callable, false);
    +
    +		scheduledTasks.offer(scheduledTask);
    +
    +		return scheduledTask;
    +	}
    +
    +	@Override
    +	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    +		return insertRunnable(command, true);
    +	}
    +
    +	@Override
    +	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    +		return insertRunnable(command, true);
    +	}
    +
    +	/**
    +	 * Triggers all registered tasks.
    +	 */
    +	public void triggerScheduledTasks() {
    +		final Iterator<ScheduledTask<?>> iterator = scheduledTasks.iterator();
    +
    +		while (iterator.hasNext()) {
    +			final ScheduledTask<?> scheduledTask = iterator.next();
    +
    +			scheduledTask.execute();
    +
    +			if (!scheduledTask.isPeriodic) {
    +				iterator.remove();
    +			}
    +		}
    +	}
    +
    +	private ScheduledFuture<?> insertRunnable(Runnable command, boolean isPeriodic) {
    +		final ScheduledTask<?> scheduledTask = new ScheduledTask<>(
    +			() -> {
    +				command.run();
    +				return null;
    +			},
    +			isPeriodic);
    +
    +		scheduledTasks.offer(scheduledTask);
    +
    +		return scheduledTask;
    +	}
    +
    +	private static final class ScheduledTask<T> implements ScheduledFuture<T> {
    +
    +		private final Callable<T> callable;
    +
    +		private final boolean isPeriodic;
    +
    +		private final CompletableFuture<T> result;
    +
    +		private ScheduledTask(Callable<T> callable, boolean isPeriodic) {
    +			this.callable = Preconditions.checkNotNull(callable);
    +			this.isPeriodic = isPeriodic;
    +
    +			this.result = new CompletableFuture<>();
    +		}
    +
    +		public boolean isPeriodic() {
    --- End diff --
    
    Will remove it.


---

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r164087390
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.blob.BlobUtils;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
    + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
    + * the stored execution graphs are periodically cleaned up.
    + */
    +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
    +
    +	private final File storageDir;
    +
    +	private final Cache<JobID, JobDetails> jobDetailsCache;
    +
    +	private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
    +
    +	private final ScheduledFuture<?> cleanupFuture;
    +
    +	private final Thread shutdownHook;
    +
    +	private int numFinishedJobs;
    +
    +	private int numFailedJobs;
    +
    +	private int numCanceledJobs;
    +
    +	public FileArchivedExecutionGraphStore(
    +			File rootDir,
    +			Time expirationTime,
    +			long maximumCacheSizeBytes,
    +			ScheduledExecutor scheduledExecutor) throws IOException {
    +
    +		final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
    +
    +		LOG.info(
    +			"Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
    +			FileArchivedExecutionGraphStore.class.getSimpleName(),
    +			storageDirectory,
    +			expirationTime.toMilliseconds(),
    +			maximumCacheSizeBytes);
    +
    +		this.storageDir = Preconditions.checkNotNull(storageDirectory);
    +		Preconditions.checkArgument(
    +			storageDirectory.exists() && storageDirectory.isDirectory(),
    +			"The storage directory must exist and be a directory.");
    +		this.jobDetailsCache = CacheBuilder.newBuilder()
    +			.expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
    +			.removalListener(
    +				(RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
    +			.build();
    +
    +		this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
    +			.maximumWeight(maximumCacheSizeBytes)
    +			.weigher(this::calculateSize)
    +			.build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
    +				@Override
    +				public ArchivedExecutionGraph load(JobID jobId) throws Exception {
    +					return loadExecutionGraph(jobId);
    +				}});
    +
    +		this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
    +			jobDetailsCache::cleanUp,
    +			expirationTime.toMilliseconds(),
    +			expirationTime.toMilliseconds(),
    +			TimeUnit.MILLISECONDS);
    +
    +		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +
    +		this.numFinishedJobs = 0;
    +		this.numFailedJobs = 0;
    +		this.numCanceledJobs = 0;
    +	}
    +
    +	@Override
    +	public int size() {
    +		return Math.toIntExact(jobDetailsCache.size());
    +	}
    +
    +	@Override
    +	@Nullable
    +	public ArchivedExecutionGraph get(JobID jobId) {
    +		try {
    +			return archivedExecutionGraphCache.get(jobId);
    +		} catch (ExecutionException e) {
    +			LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final JobStatus jobStatus = archivedExecutionGraph.getState();
    +		final JobID jobId = archivedExecutionGraph.getJobID();
    +		final String jobName = archivedExecutionGraph.getJobName();
    +
    +		Preconditions.checkArgument(
    +			jobStatus.isGloballyTerminalState(),
    +			"The job " + jobName + '(' + jobId +
    +				") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
    +
    +		switch (jobStatus) {
    +			case FINISHED:
    +				numFinishedJobs++;
    +				break;
    +			case CANCELED:
    +				numCanceledJobs++;
    +				break;
    +			case FAILED:
    +				numFailedJobs++;
    +				break;
    +			default:
    +				throw new IllegalStateException("The job " + jobName + '(' +
    +					jobId + ") should have been in a globally terminal state. " +
    +					"Instead it was in state " + jobStatus + '.');
    +		}
    +
    +		// write the ArchivedExecutionGraph to disk
    +		storeArchivedExecutionGraph(archivedExecutionGraph);
    +
    +		final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
    +
    +		jobDetailsCache.put(jobId, detailsForJob);
    +		archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    +	}
    +
    +	@Override
    +	public JobsOverview getStoredJobsOverview() {
    +		return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    +	}
    +
    +	@Override
    +	public Collection<JobDetails> getAvailableJobDetails() {
    +		return jobDetailsCache.asMap().values();
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		cleanupFuture.cancel(false);
    +
    +		jobDetailsCache.invalidateAll();
    +
    +		// clean up the storage directory
    +		FileUtils.deleteFileOrDirectory(storageDir);
    +
    +		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
    +		// shutdown hook itself
    +		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
    +			try {
    +				Runtime.getRuntime().removeShutdownHook(shutdownHook);
    +			}
    +			catch (IllegalStateException e) {
    +				// race, JVM is in shutdown already, we can safely ignore this
    +			}
    +			catch (Throwable t) {
    +				LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------
    +	// Internal methods
    +	// --------------------------------------------------------------
    +
    +	private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			return Math.toIntExact(archivedExecutionGraphFile.length());
    +		} else {
    +			LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
    +			return serializableExecutionGraph.getAllVertices().size() * 1000 +
    +				serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
    +		}
    +	}
    +
    +	private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		if (archivedExecutionGraphFile.exists()) {
    +			try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
    +				return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
    +			}
    +		} else {
    +			throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
    +				". This indicates that the file either has been deleted or never written.");
    +		}
    +	}
    +
    +	private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());
    +
    +		try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
    +			InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
    +		}
    +	}
    +
    +	private File getExecutionGraphFile(JobID jobId) {
    +		return new File(storageDir, jobId.toString());
    +	}
    +
    +	private void deleteExecutionGraphFile(JobID jobId) {
    +		Preconditions.checkNotNull(jobId);
    +
    +		final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    +
    +		try {
    +			FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
    +		} catch (IOException e) {
    +			LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
    +		}
    +
    +		archivedExecutionGraphCache.invalidate(jobId);
    +		jobDetailsCache.invalidate(jobId);
    +	}
    +
    +	private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
    +		final int maxAttempts = 10;
    +
    +		for (int attempt = 0; attempt < maxAttempts; attempt++) {
    --- End diff --
    
    The UUID is generated using a cryptographically strong random number generator. There should be enough entropy to ensure no collisions will happen. But I am also okay with leaving it to keep it consistent with BlobServer.


---