You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/07/15 13:28:26 UTC

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2256

    [FLINK-4150] [runtime] Don't clean up BlobStore on BlobServer shut down

    The `BlobServer` acts as a local cache for uploaded BLOBs. The life-cycle of each BLOB is bound to the life-cycle of the `BlobServer`. If the BlobServer shuts down (on JobManager shut down), all local files will be removed.
    
    With HA, BLOBs are persisted to another file system (e.g. HDFS) via the `BlobStore` in order to have BLOBs available after a JobManager failure (or shut down). These BLOBs are only allowed to be removed when the job that requires them enters a globally terminal state (`FINISHED`, `CANCELLED`, `FAILED`).
    
    This commit removes the `BlobStore` clean up call from the `BlobServer` shutdown. The `BlobStore` files will only be cleaned up via the `BlobLibraryCacheManager`'s' clean up task (periodically or on BlobLibraryCacheManager shutdown). This means that there is a chance that BLOBs will linger around after the job has terminated, if the job manager fails before the clean up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 4150-blobstore

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2256
    
----
commit 0d4522270881dbbb7164130f47f9d4df617c19c5
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-07-14T14:29:49Z

    [FLINK-4150] [runtime] Don't clean up BlobStore on BlobServer shut down
    
    The `BlobServer` acts as a local cache for uploaded BLOBs. The life-cycle of
    each BLOB is bound to the life-cycle of the `BlobServer`. If the BlobServer
    shuts down (on JobManager shut down), all local files will be removed.
    
    With HA, BLOBs are persisted to another file system (e.g. HDFS) via the
    `BlobStore` in order to have BLOBs available after a JobManager failure (or
    shut down). These BLOBs are only allowed to be removed when the job that
    requires them enters a globally terminal state (`FINISHED`, `CANCELLED`,
    `FAILED`).
    
    This commit removes the `BlobStore` clean up call from the `BlobServer`
    shutdown. The `BlobStore` files will only be cleaned up via the
    `BlobLibraryCacheManager`'s' clean up task (periodically or on
    BlobLibraryCacheManager shutdown). This means that there is a chance that
    BLOBs will linger around after the job has terminated, if the job manager
    fails before the clean up.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2256
  
    Just a quick question. Do we want to remove also failed jobs from the BlobStore and ZK? Or only finished or cancelled jobs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2256
  
    I don't know if we "want to", but it is the current behaviour. A job should only fail if its restart strategy is exhausted though. Do you think we should change that behaviour? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2256
  
    Changes look good to me :-). Really good work @uce. I'm just wondering whether we could remove empty folders upon shutdown of the `BlobStore`. Apart from that, +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71510844
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that the persisted job is not removed from the job graph store
    +	 * after the postStop method of the JobManager. Furthermore, it checks
    +	 * that BLOBs of the JobGraph are recovered properly and cleaned up after
    +	 * the job finishes.
    +	 */
    +	@Test
    +	public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +		FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
    +		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
    +		Configuration flinkConfiguration = new Configuration();
    +		UUID leaderSessionID = UUID.randomUUID();
    +		UUID newLeaderSessionID = UUID.randomUUID();
    +		int slots = 2;
    +		ActorRef archiveRef = null;
    +		ActorRef jobManagerRef = null;
    +		ActorRef taskManagerRef = null;
    +
    +		String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
    +		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath);
    +		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
    +
    +		try {
    +			MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
    +			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
    +			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
    +
    +			archiveRef = system.actorOf(Props.create(
    +					MemoryArchivist.class,
    +					10), "archive");
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-0",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					3600000,
    +					timeout,
    +					jobRecoveryTimeout, archiveRef);
    +
    +			ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +			taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +					flinkConfiguration,
    +					ResourceID.generate(),
    +					system,
    +					"localhost",
    +					Option.apply("taskmanager"),
    +					Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
    +					true,
    +					TestingTaskManager.class);
    +
    +			ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			Await.ready(tmAlive, deadline.timeLeft());
    +
    +			JobVertex sourceJobVertex = new JobVertex("Source");
    +			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +			sourceJobVertex.setParallelism(slots);
    +
    +			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
    +
    +			// Upload fake JAR file to first JobManager
    +			File jarFile = temporaryFolder.newFile();
    +			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));
    +			out.close();
    +
    +			jobGraph.addJar(new Path(jarFile.toURI()));
    +			JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft());
    +
    +			Future<Object> isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			Future<Object> isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			// tell jobManager that he's the leader
    +			myLeaderElectionService.isLeader(leaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			// submit blocking job
    +			Future<Object> jobSubmitted = jobManager.ask(
    +					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +			// Wait for running
    +			Future<Object> jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			// terminate the job manager
    +			jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
    +
    +			Future<Boolean> terminatedFuture = Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +			Boolean terminated = Await.result(terminatedFuture, deadline.timeLeft());
    +			assertTrue("Failed to stop job manager", terminated);
    +
    +			// job stays in the submitted job graph store
    +			assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// start new job manager
    +			myLeaderElectionService.reset();
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-1",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					500,
    +					timeout,
    +					jobRecoveryTimeout,
    +					archiveRef);
    +
    +			jobManager = new AkkaActorGateway(jobManagerRef, newLeaderSessionID);
    +
    +			Future<Object> isAlive = jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			Await.ready(isAlive, deadline.timeLeft());
    +
    +			// tell new jobManager that he's the leader
    +			myLeaderElectionService.isLeader(newLeaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			// wait that the job is recovered and reaches state RUNNING
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			Future<Object> jobFinished = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +					deadline.timeLeft());
    +
    +			BlockingInvokable.unblock();
    +
    +			// wait til the job has finished
    +			Await.ready(jobFinished, deadline.timeLeft());
    +
    +			// check that the job has been removed from the submitted job graph store
    +			assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// Check that the BLOB store files are removed
    +			File rootPath = new File(haStoragePath);
    +
    +			boolean cleanedUpFiles = false;
    +			while (deadline.hasTimeLeft()) {
    +				if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    We check that the directory no longer contains files. But we don't check for folders, right? I think that we no longer delete the folders created by the BlobStore. We could maybe check in `BlobStore.cleanUp` whether there are any empty folders which we can delete. Do you think that this could be relevant? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71509039
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---
    @@ -77,7 +77,7 @@ public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) {
     
     		// Initializing the clean up task
     		this.cleanupTimer = new Timer(true);
    -		this.cleanupTimer.schedule(this, cleanupInterval);
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    --- End diff --
    
    Good catch :+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71711908
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that the persisted job is not removed from the job graph store
    +	 * after the postStop method of the JobManager. Furthermore, it checks
    +	 * that BLOBs of the JobGraph are recovered properly and cleaned up after
    +	 * the job finishes.
    +	 */
    +	@Test
    +	public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +		FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
    +		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
    +		Configuration flinkConfiguration = new Configuration();
    +		UUID leaderSessionID = UUID.randomUUID();
    +		UUID newLeaderSessionID = UUID.randomUUID();
    +		int slots = 2;
    +		ActorRef archiveRef = null;
    +		ActorRef jobManagerRef = null;
    +		ActorRef taskManagerRef = null;
    +
    +		String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
    +		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath);
    +		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
    +
    +		try {
    +			MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
    +			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
    +			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
    +
    +			archiveRef = system.actorOf(Props.create(
    +					MemoryArchivist.class,
    +					10), "archive");
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-0",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					3600000,
    +					timeout,
    +					jobRecoveryTimeout, archiveRef);
    +
    +			ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +			taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +					flinkConfiguration,
    +					ResourceID.generate(),
    +					system,
    +					"localhost",
    +					Option.apply("taskmanager"),
    +					Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
    +					true,
    +					TestingTaskManager.class);
    +
    +			ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			Await.ready(tmAlive, deadline.timeLeft());
    +
    +			JobVertex sourceJobVertex = new JobVertex("Source");
    +			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +			sourceJobVertex.setParallelism(slots);
    +
    +			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
    +
    +			// Upload fake JAR file to first JobManager
    +			File jarFile = temporaryFolder.newFile();
    +			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));
    +			out.close();
    +
    +			jobGraph.addJar(new Path(jarFile.toURI()));
    +			JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft());
    +
    +			Future<Object> isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			Future<Object> isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			// tell jobManager that he's the leader
    +			myLeaderElectionService.isLeader(leaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			// submit blocking job
    +			Future<Object> jobSubmitted = jobManager.ask(
    +					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +			// Wait for running
    +			Future<Object> jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			// terminate the job manager
    +			jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
    +
    +			Future<Boolean> terminatedFuture = Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +			Boolean terminated = Await.result(terminatedFuture, deadline.timeLeft());
    +			assertTrue("Failed to stop job manager", terminated);
    +
    +			// job stays in the submitted job graph store
    +			assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// start new job manager
    +			myLeaderElectionService.reset();
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-1",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					500,
    +					timeout,
    +					jobRecoveryTimeout,
    +					archiveRef);
    +
    +			jobManager = new AkkaActorGateway(jobManagerRef, newLeaderSessionID);
    +
    +			Future<Object> isAlive = jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			Await.ready(isAlive, deadline.timeLeft());
    +
    +			// tell new jobManager that he's the leader
    +			myLeaderElectionService.isLeader(newLeaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			// wait that the job is recovered and reaches state RUNNING
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			Future<Object> jobFinished = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +					deadline.timeLeft());
    +
    +			BlockingInvokable.unblock();
    +
    +			// wait til the job has finished
    +			Await.ready(jobFinished, deadline.timeLeft());
    +
    +			// check that the job has been removed from the submitted job graph store
    +			assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// Check that the BLOB store files are removed
    +			File rootPath = new File(haStoragePath);
    +
    +			boolean cleanedUpFiles = false;
    +			while (deadline.hasTimeLeft()) {
    +				if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    Yes, that is true. We will for example have empty folders `<root>/blob/cache` in this test. I've added a method to try to delete the parent directory when deleting a BLOB (same as what are currently doing in `AbstractFileStateHandle`). I will adjust this check to check that the directory is empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2256
  
    Thank you for your review. I've addressed your comment and now parent directories are deleted if empty, resulting in an empty storage folder after regular cleanup. If there are no objections, I would like to merge this later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2256
  
    In general, I think it would be helpful for users to be able to retrieve checkpoints of a failed job. I could imagine a scenario where a job is faulty but one only runs into after some time. Being then able to transform a checkpoint into a savepoint and then restarting the failed job with a corrected jar could be helpful.
    
    Thus, I think we should only remove the persisted job data if the job has reached FINISHED or CANCELED. Admittedly, this is a very conservative approach, but then users are less likely to lose data.
    
    However, this should be out of the scope of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2256


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---