You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/01 20:22:39 UTC

[1/3] flink git commit: [FLINK-5193] [jm] Harden job recovery in case of recovery failures

Repository: flink
Updated Branches:
  refs/heads/release-1.1 59f61bf6c -> 9c058871f


[FLINK-5193] [jm] Harden job recovery in case of recovery failures

When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
This PR changes this behaviour to make the recovery of jobs independent so that a single
failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
for failures originating in the ZooKeeperSubmittedJobGraphStore.

Add test case

Fix failing JobManagerHACheckpointRecoveryITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d314bc52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d314bc52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d314bc52

Branch: refs/heads/release-1.1
Commit: d314bc5235e2573ff77f45d327bc62f521063b71
Parents: 59f61bf
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 29 17:31:08 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 1 17:53:34 2016 +0100

----------------------------------------------------------------------
 .../StandaloneSubmittedJobGraphStore.java       |  11 +-
 .../jobmanager/SubmittedJobGraphStore.java      |  19 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        | 113 +++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java    |  44 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  45 ++---
 .../jobmanager/JobManagerHARecoveryTest.java    | 165 ++++++++++++++++++-
 .../StandaloneSubmittedJobGraphStoreTest.java   |  11 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |  29 ++--
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 9 files changed, 315 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..8267b9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
@@ -54,12 +53,12 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	}
 
 	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
-		return Option.empty();
+	public Collection<JobID> getJobIds() throws Exception {
+		return Collections.emptyList();
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-		return Collections.emptyList();
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..55c2e79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * {@link SubmittedJobGraph} instances for recovery.
@@ -40,16 +38,11 @@ public interface SubmittedJobGraphStore {
 	void stop() throws Exception;
 
 	/**
-	 * Returns a list of all submitted {@link JobGraph} instances.
-	 */
-	List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
-
-	/**
 	 * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
 	 *
 	 * <p>An Exception is thrown, if no job graph with the given ID exists.
 	 */
-	Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+	SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
 
 	/**
 	 * Adds the {@link SubmittedJobGraph} instance.
@@ -64,6 +57,14 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Get all job ids of submitted job graphs to the submitted job graph store.
+	 *
+	 * @return Collection of submitted job ids
+	 * @throws Exception if the operation fails
+	 */
+	Collection<JobID> getJobIds() throws Exception;
+
+	/**
 	 * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
 	 * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7324c07..859d319 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -24,18 +24,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -156,73 +153,41 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		String path = getPathForJob(jobId);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
-
-			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
-
-			while (true) {
-				try {
-					submitted = jobGraphsInZooKeeper.getAll();
-					break;
-				}
-				catch (ConcurrentModificationException e) {
-					LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
-				}
-			}
-
-			LOG.info("Found {} job graphs.", submitted.size());
-
-			if (submitted.size() != 0) {
-				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
-
-				for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
-					SubmittedJobGraph jobGraph = jobStateHandle
-							.f0.getState(ClassLoader.getSystemClassLoader());
+			StateHandle<SubmittedJobGraph> submittedJobStateHandle;
 
-					addedJobGraphs.add(jobGraph.getJobId());
 
-					jobGraphs.add(jobGraph);
-				}
-
-				LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
-				return jobGraphs;
-			}
-			else {
-				LOG.info("No job graph to recover.");
-				return Collections.emptyList();
+			try {
+				submittedJobStateHandle = jobGraphsInZooKeeper.get(path);
+			} catch (KeeperException.NoNodeException ignored) {
+				return null;
+			} catch (Exception e) {
+				throw new Exception("Could not retrieve the submitted job graph state handle " +
+					"for " + path + "from the submitted job graph store.", e);
 			}
-		}
-	}
-
-	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
-		checkNotNull(jobId, "Job ID");
-		String path = getPathForJob(jobId);
+			
+			SubmittedJobGraph jobGraph;
 
-		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+			LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
 
-		synchronized (cacheLock) {
-			verifyIsRunning();
 
 			try {
-				StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
-
-				SubmittedJobGraph jobGraph = jobStateHandle
-						.getState(ClassLoader.getSystemClassLoader());
+				jobGraph = submittedJobStateHandle.getState(getClass().getClassLoader());
+			} catch (Exception e) {
+				throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
+			}
 
-				addedJobGraphs.add(jobGraph.getJobId());
+			addedJobGraphs.add(jobGraph.getJobId());
 
-				LOG.info("Recovered {}.", jobGraph);
+			LOG.info("Recovered {}.", jobGraph);
 
-				return Option.apply(jobGraph);
-			}
-			catch (KeeperException.NoNodeException ignored) {
-				return Option.empty();
-			}
+			return jobGraph;
 		}
 	}
 
@@ -290,6 +255,29 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
+	@Override
+	public Collection<JobID> getJobIds() throws Exception {
+		Collection<String> paths;
+
+		try {
+			paths = jobGraphsInZooKeeper.getAllPaths();
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
+		}
+
+		List<JobID> jobIds = new ArrayList<>(paths.size());
+
+		for (String path : paths) {
+			try {
+				jobIds.add(jobIdfromPath(path));
+			} catch (Exception exception) {
+				LOG.warn("Could not parse job id from {}.", path, exception);
+			}
+		}
+
+		return jobIds;
+	}
+
 	/**
 	 * Monitors ZooKeeper for changes.
 	 *
@@ -412,4 +400,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		return String.format("/%s", jobId);
 	}
 
+	/**
+	 * Returns the JobID from the given path in ZooKeeper.
+	 *
+	 * @param path in ZooKeeper
+	 * @return JobID associated with the given path
+	 */
+	public static JobID jobIdfromPath(final String path) {
+		return JobID.fromHexString(path);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 6576ff8..0d63a15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,8 +30,11 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -226,10 +229,45 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public StateHandle<T> get(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		byte[] data = client.getData().forPath(pathInZooKeeper);
+		byte[] data;
 
-		return (StateHandle<T>) InstantiationUtil
-				.deserializeObject(data, ClassLoader.getSystemClassLoader());
+		try {
+			data = client.getData().forPath(pathInZooKeeper);
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
+				" from ZooKeeper.", e);
+		}
+
+		try {
+			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new Exception("Failed to deserialize state handle from ZooKeeper data from " +
+				pathInZooKeeper + '.', e);
+		}
+	}
+
+	/**
+	 * Return a list of all valid paths for state handles.
+	 *
+	 * @return List of valid state handle paths in ZooKeeper
+	 * @throws Exception if a ZooKeeper operation fails
+	 */
+	public Collection<String> getAllPaths() throws Exception {
+		final String path = "/";
+
+		while(true) {
+			Stat stat = client.checkExists().forPath(path);
+
+			if (stat == null) {
+				return Collections.emptyList();
+			} else {
+				try {
+					return client.getChildren().forPath(path);
+				} catch (KeeperException.NoNodeException ignored) {
+					// Concurrent deletion, retry
+				}
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9061db4..9f6e2db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -471,6 +471,8 @@ class JobManager(
 
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+        log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.")
+
         submitJob(
           submittedJobGraph.getJobGraph(),
           submittedJobGraph.getJobInfo(),
@@ -492,7 +494,7 @@ class JobManager(
             log.info(s"Attempting to recover job $jobId.")
             val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
 
-            submittedJobGraphOption match {
+            Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
                 if (!leaderElectionService.hasLeadership()) {
                   // we've lost leadership. mission: abort.
@@ -505,37 +507,31 @@ class JobManager(
             }
           }
         } catch {
-          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
+          case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
     case RecoverAllJobs =>
       future {
-        try {
-          // The ActorRef, which is part of the submitted job graph can only be
-          // de-serialized in the scope of an actor system.
-          akka.serialization.JavaSerializer.currentSystem.withValue(
-            context.system.asInstanceOf[ExtendedActorSystem]) {
+        log.info("Attempting to recover all jobs.")
 
-            log.info(s"Attempting to recover all jobs.")
-
-            val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+        try {
+          val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
 
-            if (!leaderElectionService.hasLeadership()) {
-              // we've lost leadership. mission: abort.
-              log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
-                s"jobs.")
-            } else {
-              log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
+          if (jobIdsToRecover.isEmpty) {
+            log.info("There are no jobs to recover.")
+          } else {
+            log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " +
+                       s"recovery.")
 
-              jobGraphs.foreach{
-                submittedJobGraph =>
-                  self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
+            jobIdsToRecover foreach {
+              jobId => self ! decorateMessage(RecoverJob(jobId))
             }
           }
         } catch {
-          case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
+          case e: Exception =>
+            log.warn("Failed to recover job ids from submitted job graph store. Aborting " +
+                       "recovery.", e)
         }
       }(context.dispatcher)
 
@@ -1039,7 +1035,12 @@ class JobManager(
    * @param jobInfo the job info
    * @param isRecovery Flag indicating whether this is a recovery or initial submission
    */
-  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
+  private def submitJob(
+      jobGraph: JobGraph,
+      jobInfo: JobInfo,
+      isRecovery: Boolean = false)
+    : Unit = {
+
     if (jobGraph == null) {
       jobInfo.client ! decorateMessage(JobResultFailure(
         new SerializedThrowable(

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b98f338..b78f1fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.japi.pf.FI;
+import akka.japi.pf.ReceiveBuilder;
+import akka.pattern.Patterns;
+import akka.testkit.CallingThreadDispatcher;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -29,15 +34,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -49,10 +57,12 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -69,25 +80,35 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Int;
 import scala.Option;
+import scala.PartialFunction;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.runtime.BoxedUnit;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class JobManagerHARecoveryTest {
 
@@ -288,6 +309,134 @@ public class JobManagerHARecoveryTest {
 	}
 
 	/**
+	 * Tests that a failing job recovery won't cause other job recoveries to fail.
+	 */
+	@Test
+	public void testFailingJobRecovery() throws Exception {
+		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+		final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
+		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+		final Configuration flinkConfiguration = new Configuration();
+		UUID leaderSessionID = UUID.randomUUID();
+		ActorRef jobManager = null;
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		// set HA mode to zookeeper so that we try to recover jobs
+		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+
+		try {
+			final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+
+			SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
+			when(submittedJobGraph.getJobId()).thenReturn(jobId2);
+
+			when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
+
+			// fail the first job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception"));
+			// succeed the second job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
+
+			final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
+
+			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
+
+			Props jobManagerProps = Props.create(
+				TestingFailingHAJobManager.class,
+				flinkConfiguration,
+				TestExecutors.directExecutor(),
+				TestExecutors.directExecutor(),
+				mock(InstanceManager.class),
+				mock(Scheduler.class),
+				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+				ActorRef.noSender(),
+				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+				timeout,
+				myLeaderElectionService,
+				submittedJobGraphStore,
+				mock(CheckpointRecoveryFactory.class),
+				mock(SavepointStore.class),
+				jobRecoveryTimeout,
+				Option.<MetricRegistry>apply(null),
+				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
+
+			jobManager = system.actorOf(jobManagerProps, "jobmanager");
+
+			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
+
+			Await.ready(started, deadline.timeLeft());
+
+			// make the job manager the leader --> this triggers the recovery of all jobs
+			myLeaderElectionService.isLeader(leaderSessionID);
+
+			// check that we have successfully recovered the second job
+			assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+		} finally {
+			TestingUtils.stopActor(jobManager);
+		}
+	}
+
+	static class TestingFailingHAJobManager extends JobManager {
+
+		private final Collection<JobID> recoveredJobs;
+
+		public TestingFailingHAJobManager(
+			Configuration flinkConfiguration,
+			Executor futureExecutor,
+			Executor ioExecutor,
+			InstanceManager instanceManager,
+			Scheduler scheduler,
+			BlobLibraryCacheManager libraryCacheManager,
+			ActorRef archive,
+			RestartStrategyFactory restartStrategyFactory,
+			FiniteDuration timeout,
+			LeaderElectionService leaderElectionService,
+			SubmittedJobGraphStore submittedJobGraphs,
+			CheckpointRecoveryFactory checkpointRecoveryFactory,
+			SavepointStore savepointStore,
+			FiniteDuration jobRecoveryTimeout,
+			Option<MetricRegistry> metricRegistry,
+			Collection<JobID> recoveredJobs) {
+			super(
+				flinkConfiguration,
+				futureExecutor,
+				ioExecutor,
+				instanceManager,
+				scheduler,
+				libraryCacheManager,
+				archive,
+				restartStrategyFactory,
+				timeout,
+				leaderElectionService,
+				submittedJobGraphs,
+				checkpointRecoveryFactory,
+				savepointStore,
+				jobRecoveryTimeout,
+				metricRegistry);
+
+			this.recoveredJobs = recoveredJobs;
+		}
+
+		@Override
+		public PartialFunction<Object, BoxedUnit> handleMessage() {
+			return ReceiveBuilder.match(
+				JobManagerMessages.RecoverSubmittedJob.class,
+				new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
+					@Override
+					public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
+						recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
+					}
+				}).matchAny(new FI.UnitApply<Object>() {
+				@Override
+				public void apply(Object o) throws Exception {
+					TestingFailingHAJobManager.super.handleMessage().apply(o);
+				}
+			}).build();
+		}
+	}
+
+	/**
 	 * A checkpoint store, which supports shutdown and suspend. You can use this to test HA
 	 * as long as the factory always returns the same store instance.
 	 */
@@ -383,16 +532,11 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-			return new ArrayList<>(storedJobs.values());
-		}
-
-		@Override
-		public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 			if (storedJobs.containsKey(jobId)) {
-				return Option.apply(storedJobs.get(jobId));
+				return storedJobs.get(jobId);
 			} else {
-				return Option.apply(null);
+				return null;
 			}
 		}
 
@@ -406,6 +550,11 @@ public class JobManagerHARecoveryTest {
 			storedJobs.remove(jobId);
 		}
 
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			return storedJobs.keySet();
+		}
+
 		boolean contains(JobID jobId) {
 			return storedJobs.containsKey(jobId);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 8ebb7f8..079a10e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class StandaloneSubmittedJobGraphStoreTest {
 
@@ -41,14 +40,14 @@ public class StandaloneSubmittedJobGraphStoreTest {
 				new JobGraph("testNoOps"),
 				new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.putJobGraph(jobGraph);
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
-		assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+		assertNull(jobGraphs.recoverJobGraph(new JobID()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 8eaecd0..cc9e815 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -36,8 +36,8 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
@@ -93,32 +93,36 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Add initial
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify initial job graph
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			Collection<JobID> jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			JobID jobId = jobIds.iterator().next();
+
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Update (same ID)
 			jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify updated
-			actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
+
+			jobId = jobIds.iterator().next();
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Remove
 			jobGraphs.removeJobGraph(jobGraph.getJobId());
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
@@ -154,11 +158,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 				jobGraphs.putJobGraph(jobGraph);
 			}
 
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+			Collection<JobID> actual = jobGraphs.getJobIds();
 
 			assertEquals(expected.size(), actual.size());
 
-			for (SubmittedJobGraph jobGraph : actual) {
+			for (JobID jobId : actual) {
+				SubmittedJobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
 				assertTrue(expected.containsKey(jobGraph.getJobId()));
 
 				verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
@@ -167,7 +172,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 			}
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/d314bc52/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 262f78a..e598ac5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -364,7 +364,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				nonLeadingJobManagerProcess = jobManagerProcess[0];
 			}
 
-			// BLocking JobGraph
+			// Blocking JobGraph
 			JobVertex blockingVertex = new JobVertex("Blocking vertex");
 			blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph(blockingVertex);
@@ -393,7 +393,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				String output = nonLeadingJobManagerProcess.getProcessOutput();
 
 				if (output != null) {
-					if (output.contains("Fatal error: Failed to recover jobs") &&
+					if (output.contains("Failed to recover job") &&
 							output.contains("java.io.FileNotFoundException")) {
 
 						success = true;


[3/3] flink git commit: [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

Posted by tr...@apache.org.
[FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

Adds exception handling to the stream operators for the snapshotState method. In case of an
exception while performing the snapshot operation, all until then checkpointed data will
be discarded/deleted. This makes sure that a failing checkpoint operation won't leave
orphaned checkpoint data (e.g. files) behind.

Add test case for FsCheckpointStateOutputStream

Add RocksDB FullyAsyncSnapshot cleanup test

Add proper state cleanup tests for window operator

Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c058871
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c058871
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c058871

Branch: refs/heads/release-1.1
Commit: 9c058871f778f748059829b1b350687e3f789f6f
Parents: 4b734d7
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 1 13:25:05 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 1 18:04:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 102 ++++++----
 .../state/RocksDBAsyncKVSnapshotTest.java       |  67 ++++++-
 .../operator/AbstractCEPPatternOperator.java    |  67 +++++--
 .../AbstractKeyedCEPPatternOperator.java        |  53 +++++-
 .../runtime/state/AbstractStateBackend.java     |  25 ++-
 .../state/filesystem/FsStateBackend.java        |  79 ++++++--
 .../apache/flink/runtime/taskmanager/Task.java  |   4 +-
 .../FsCheckpointStateOutputStreamTest.java      | 146 ++++++++++++--
 .../source/ContinuousFileReaderOperator.java    |  80 ++++++--
 .../api/operators/AbstractStreamOperator.java   |  29 ++-
 .../operators/AbstractUdfStreamOperator.java    |  19 +-
 .../operators/GenericWriteAheadSink.java        |  37 +++-
 ...ractAlignedProcessingTimeWindowOperator.java |  59 +++++-
 .../operators/windowing/WindowOperator.java     |  50 ++++-
 .../streaming/runtime/tasks/StreamTask.java     |  29 ++-
 .../runtime/tasks/StreamTaskStateList.java      |   2 +-
 .../streaming/api/operators/StreamMapTest.java  |  64 ++++++-
 .../operators/windowing/WindowOperatorTest.java | 190 ++++++++++++++++++-
 18 files changed, 960 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 0412a4a..1561afc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -750,59 +750,72 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 		@Override
 		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
+			long startTime = System.currentTimeMillis();
+			CheckpointStateOutputView outputView;
 
-				CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+			try {
+				try {
+					outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+				} catch (Exception e) {
+					throw new Exception("Could not create a checkpoint state output view to " +
+						"materialize the checkpoint data into.", e);
+				}
 
-				outputView.writeInt(columnFamilies.size());
+				try {
+					outputView.writeInt(columnFamilies.size());
 
-				// we don't know how many key/value pairs there are in each column family.
-				// We prefix every written element with a byte that signifies to which
-				// column family it belongs, this way we can restore the column families
-				byte count = 0;
-				Map<String, Byte> columnFamilyMapping = new HashMap<>();
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					columnFamilyMapping.put(column.getKey(), count);
+					// we don't know how many key/value pairs there are in each column family.
+					// We prefix every written element with a byte that signifies to which
+					// column family it belongs, this way we can restore the column families
+					byte count = 0;
+					Map<String, Byte> columnFamilyMapping = new HashMap<>();
+					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+						columnFamilyMapping.put(column.getKey(), count);
 
-					outputView.writeByte(count);
+						outputView.writeByte(count);
 
-					ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
-					ooOut.writeObject(column.getValue().f1);
-					ooOut.flush();
+						ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+						ooOut.writeObject(column.getValue().f1);
+						ooOut.flush();
 
-					count++;
-				}
+						count++;
+					}
 
-				ReadOptions readOptions = new ReadOptions();
-				readOptions.setSnapshot(snapshot);
+					ReadOptions readOptions = new ReadOptions();
+					readOptions.setSnapshot(snapshot);
 
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					byte columnByte = columnFamilyMapping.get(column.getKey());
+					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+						byte columnByte = columnFamilyMapping.get(column.getKey());
 
-					synchronized (dbCleanupLock) {
-						if (db == null) {
-							throw new RuntimeException("RocksDB instance was disposed. This happens " +
+						synchronized (dbCleanupLock) {
+							if (db == null) {
+								throw new RuntimeException("RocksDB instance was disposed. This happens " +
 									"when we are in the middle of a checkpoint and the job fails.");
-						}
-						RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
-						iterator.seekToFirst();
-						while (iterator.isValid()) {
-							outputView.writeByte(columnByte);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+							}
+							RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+							iterator.seekToFirst();
+							while (iterator.isValid()) {
+								outputView.writeByte(columnByte);
+								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
 									outputView);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
 									outputView);
-							iterator.next();
+								iterator.next();
+							}
 						}
 					}
-				}
-
-				StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
+				} catch (Exception e) {
+					try {
+						// closing the output view deletes the underlying data
+						outputView.close();
+					} catch (Exception closingException) {
+						LOG.warn("Could not close the checkpoint state output view. The " +
+							"written data might not be deleted.", closingException);
+					}
 
-				long endTime = System.currentTimeMillis();
-				LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+					throw new Exception("Could not write the checkpoint data into the checkpoint " +
+						"state output view.", e);
+				}
 			} finally {
 				synchronized (dbCleanupLock) {
 					if (db != null) {
@@ -811,6 +824,19 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				}
 				snapshot = null;
 			}
+
+			StateHandle<DataInputView> stateHandle;
+
+			try {
+				stateHandle = outputView.closeAndGetHandle();
+			} catch (Exception ioE) {
+				throw new Exception("Could not close the checkpoint state output view and " +
+					"obtain the state handle.", ioE);
+			}
+
+			long endTime = System.currentTimeMillis();
+			LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
+			return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
index a58686b..24728d7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
@@ -27,8 +27,13 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -48,7 +53,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -56,12 +63,22 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for asynchronous RocksDB Key/Value state checkpoints.
@@ -72,13 +89,16 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class RocksDBAsyncKVSnapshotTest {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Before
 	public void checkOperatingSystem() {
 		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
 	}
 
 	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
 	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -180,7 +200,7 @@ public class RocksDBAsyncKVSnapshotTest {
 	}
 
 	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
 	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -282,6 +302,49 @@ public class RocksDBAsyncKVSnapshotTest {
 		testHarness.waitForTaskCompletion();
 	}
 
+	@Test
+	public void testCleanupOfFullyAsyncSnapshotsInFailureCase() throws Exception {
+		long checkpointId = 1L;
+		long timestamp = 42L;
+
+		File chkDir = temporaryFolder.newFolder();
+		File dbDir = temporaryFolder.newFolder();
+
+		Environment env = new DummyEnvironment("test task", 1, 0);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+		doThrow(new IOException("Test exception")).when(outputStream).write(anyInt());
+
+		RocksDBStateBackend backend = spy(new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()));
+		doReturn(outputStream).when(backend).createCheckpointStateOutputStream(anyLong(), anyLong());
+		backend.setDbStoragePath(dbDir.getAbsolutePath());
+		backend.enableFullyAsyncSnapshots();
+
+		backend.initializeForJob(
+			env,
+			"test operator",
+			VoidSerializer.INSTANCE
+		);
+		backend.getPartitionedState(null, VoidSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", Object.class, new Object()));
+
+		Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStateSnapshotHashMap = backend.snapshotPartitionedState(checkpointId, timestamp);
+
+		for (KvStateSnapshot<?, ?, ?, ?, ?> kvStateSnapshot : kvStateSnapshotHashMap.values()) {
+			if (kvStateSnapshot instanceof AsynchronousKvStateSnapshot) {
+				AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asynchronousKvStateSnapshot = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStateSnapshot;
+				try {
+					asynchronousKvStateSnapshot.materialize();
+					fail("Expected an Exception here.");
+				} catch (Exception expected) {
+					//expected exception
+				}
+			} else {
+				fail("Expected an asynchronous kv state snapshot.");
+			}
+		}
+
+		verify(outputStream).close();
+	}
+
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index b523f46..4b5a703 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -111,22 +111,67 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
-			checkpointId,
-			timestamp);
+		final AbstractStateBackend.CheckpointStateOutputStream os;
+
+		try {
+			os = this.getStateBackend().createCheckpointStateOutputStream(
+				checkpointId,
+				timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output stream for " +
+				getOperatorName() + '.', e);
+		}
 
-		final ObjectOutputStream oos = new ObjectOutputStream(os);
-		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+		try {
+			final ObjectOutputStream oos = new ObjectOutputStream(os);
+			final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
 
-		oos.writeObject(nfa);
+			oos.writeObject(nfa);
+			oos.writeInt(priorityQueue.size());
+			oos.flush();
 
-		ov.writeInt(priorityQueue.size());
+			for (StreamRecord<IN> streamRecord : priorityQueue) {
+				streamRecordSerializer.serialize(streamRecord, ov);
+			}
+
+			ov.flush();
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
 
-		for (StreamRecord<IN> streamRecord: priorityQueue) {
-			streamRecordSerializer.serialize(streamRecord, ov);
+			try {
+				// closing the output stream should delete the written data
+				os.close();
+			} catch (Exception closeException) {
+				LOG.warn("Could not close the checkpoint state output stream of {}. The written " +
+					"data might not be deleted.", getOperatorName(), closeException);
+			}
+
+			throw new Exception("Could not write state for " + getOperatorName() +
+				" to checkpoint state output stream.", e);
 		}
 
-		taskState.setOperatorState(os.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(os.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output stream of " + getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}
@@ -144,7 +189,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 		nfa = (NFA<IN>)ois.readObject();
 
-		int numberPriorityQueueEntries = div.readInt();
+		int numberPriorityQueueEntries = ois.readInt();
 
 		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 03e40ac..cb456b8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -191,15 +191,58 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputView ov;
+
+		try {
+			ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
 
-		ov.writeInt(keys.size());
+			throw new Exception("Could not create checkpoint state output view for " +
+				getOperatorName() + '.', e);
+		}
 
-		for (KEY key: keys) {
-			keySerializer.serialize(key, ov);
+		try {
+			ov.writeInt(keys.size());
+
+			for (KEY key : keys) {
+				keySerializer.serialize(key, ov);
+			}
+		} catch (Exception exception) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the output view should delete any written data
+				ov.close();
+			} catch (IOException closingException) {
+				LOG.warn("Could not close the checkpoint state output view of {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write state of " + getOperatorName() +
+				" to the checkpoint state output view.", exception);
 		}
 
-		taskState.setOperatorState(ov.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(ov.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output view of " + getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index ab9854c..068c6b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -38,6 +38,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -56,6 +58,8 @@ public abstract class AbstractStateBackend implements java.io.Serializable, Clos
 
 	private static final long serialVersionUID = 4620413814639220247L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractStateBackend.class);
+
 	protected transient TypeSerializer<?> keySerializer;
 
 	protected transient ClassLoader userCodeClassLoader;
@@ -354,10 +358,25 @@ public abstract class AbstractStateBackend implements java.io.Serializable, Clos
 		if (keyValueStates != null) {
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
 
-			for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
-				KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
-				snapshots.put(entry.getKey(), snapshot);
+			try {
+				for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
+					KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
+					snapshots.put(entry.getKey(), snapshot);
+				}
+			} catch (Exception e) {
+				for (Map.Entry<String, KvStateSnapshot<?, ?, ?, ?, ?>> entry : snapshots.entrySet()) {
+					KvStateSnapshot<?, ?, ?, ?, ?> kvStateSnapshot = entry.getValue();
+
+					try {
+						kvStateSnapshot.discardState();
+					} catch (Exception discardException) {
+						LOG.warn("Could not discard partitioned state {}.", entry.getKey(), discardException);
+					}
+				}
+
+				throw new Exception("Could not create a partitioned state snapshot.", e);
 			}
+
 			return snapshots;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 446f3ea..e783264 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -363,6 +363,8 @@ public class FsStateBackend extends AbstractStateBackend {
 			os.flush();
 
 			return stream.closeAndGetHandle().toSerializableHandle();
+		} catch (IOException ioE) {
+			throw new IOException("Could not serialize state.", ioE);
 		}
 	}
 
@@ -602,6 +604,8 @@ public class FsStateBackend extends AbstractStateBackend {
 		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
 		 * feature, for example). This method throws no exception if the deletion fails, but only
 		 * logs the error.
+		 *
+		 * Important: This method should never throw any {@link Throwable}.
 		 */
 		@Override
 		public void close() {
@@ -620,15 +624,19 @@ public class FsStateBackend extends AbstractStateBackend {
 				if (outStream != null) {
 					try {
 						outStream.close();
-						fs.delete(statePath, false);
-
-						// attempt to delete the parent (will fail and be ignored if the parent has more files)
+					} catch (Throwable e) {
+						LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
+					} finally {
 						try {
-							fs.delete(basePath, false);
-						} catch (IOException ignored) {}
-					}
-					catch (Exception e) {
-						LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e);
+							fs.delete(statePath, false);
+
+							// attempt to delete the parent (will fail and be ignored if the parent has more files)
+							try {
+								fs.delete(basePath, false);
+							} catch (Throwable ignored) {}
+						} catch (Throwable ioE) {
+							LOG.warn("Could not delete stream file for {}.", statePath, ioE);
+						}
 					}
 				}
 			}
@@ -650,12 +658,29 @@ public class FsStateBackend extends AbstractStateBackend {
 						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
 						pos = writeBuffer.length;
 						return new ByteStreamStateHandle(bytes);
-					}
-					else {
-						flush();
-						outStream.close();
-						closed = true;
-						pos = writeBuffer.length;
+					} else {
+						try {
+							flush();
+							outStream.close();
+						} catch (Exception exception) {
+							LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
+
+							try {
+								fs.delete(statePath, false);
+
+								try {
+									fs.delete(basePath, false);
+								} catch (Throwable ignored) {}
+							} catch (Throwable deleteException) {
+								LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
+							}
+
+							throw new IOException("Could not close the file system output stream.", exception);
+						} finally {
+							closed = true;
+							pos = writeBuffer.length;
+						}
+
 						return new FileStreamStateHandle(statePath);
 					}
 				}
@@ -680,10 +705,28 @@ public class FsStateBackend extends AbstractStateBackend {
 					}
 
 					// close all resources
-					flush();
-					outStream.close();
-					closed = true;
-					pos = writeBuffer.length;
+					try {
+						flush();
+						outStream.close();
+					} catch (Exception exception) {
+						LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
+
+						try {
+							fs.delete(statePath, false);
+
+							try {
+								fs.delete(basePath, false);
+							} catch (Throwable ignored) {}
+						} catch (Throwable deleteException) {
+							LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
+						}
+
+						throw new IOException("Could not close the file system output stream.", exception);
+					} finally {
+						closed = true;
+						pos = writeBuffer.length;
+					}
+
 					return statePath;
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 9565666..e4e1b36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1008,8 +1008,8 @@ public class Task implements Runnable {
 						catch (Throwable t) {
 							if (getExecutionState() == ExecutionState.RUNNING) {
 								failExternally(new Exception(
-									"Error while triggering checkpoint for " + taskName,
-									t));
+									"Error while triggering checkpoint " + checkpointID + " for " +
+										taskName, t));
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 3aba9e1..30747ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
@@ -25,33 +26,50 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
 import java.util.Random;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FsCheckpointStateOutputStreamTest {
 
-	/** The temp dir, obtained in a platform neutral way */
-	private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
+	/** The temp dir */
+	private Path tempDirPath = null;
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Before
+	public void setup() throws IOException {
+		tempDirPath = new Path(temporaryFolder.newFolder().toURI());
+	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testWrongParameters() {
+	public void testWrongParameters() throws IOException {
 		// this should fail
 		new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
+			tempDirPath, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
 	}
 
 	@Test
 	public void testEmptyState() throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
+			tempDirPath, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
 		
 		StreamStateHandle handle = stream.closeAndGetHandle();
 		assertTrue(handle instanceof ByteStreamStateHandle);
@@ -63,7 +81,7 @@ public class FsCheckpointStateOutputStreamTest {
 	@Test
 	public void testCloseAndGetPath() throws Exception {
 		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH,
+				tempDirPath,
 				FileSystem.getLocalFileSystem(),
 				new HashSet<FsCheckpointStateOutputStream>(),
 				1024,
@@ -83,13 +101,13 @@ public class FsCheckpointStateOutputStreamTest {
 		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		assertFalse(stream1.isClosed());
 		assertFalse(stream2.isClosed());
@@ -122,13 +140,13 @@ public class FsCheckpointStateOutputStreamTest {
 		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		assertTrue(openStreams.contains(stream1));
 		assertTrue(openStreams.contains(stream2));
@@ -174,10 +192,110 @@ public class FsCheckpointStateOutputStreamTest {
 		runTest(16678, 4096, 0, true);
 	}
 
+	/**
+	 * Tests that the underlying stream file is deleted upon calling close.
+	 */
+	@Test
+	public void testCleanupWhenClosingStream() throws IOException {
+
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+
+		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		stream.close();
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
+	/**
+	 * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails.
+	 */
+	@Test
+	public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+		doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		try {
+			stream.closeAndGetHandle();
+			fail("Expected IOException");
+		} catch (IOException ioE) {
+			// expected exception
+		}
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
+	/**
+	 * Tests that the underlying stream file is deleted if the closeAndGetPath method fails.
+	 */
+	@Test
+	public void testCleanupWhenFailingCloseAndGetPath() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+		doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+		FsStateBackend.FsCheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		try {
+			stream.closeAndGetPath();
+			fail("Expected IOException");
+		} catch (IOException ioE) {
+			// expected exception
+		}
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+	
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream =
 			new FsStateBackend.FsCheckpointStateOutputStream(
-					TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+					tempDirPath, FileSystem.getLocalFileSystem(),
 					new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
 		
 		Random rnd = new Random();

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 923943f..cb0b001 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -406,29 +406,75 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		final AbstractStateBackend.CheckpointStateOutputStream os =
-			this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputStream os;
 
-		final ObjectOutputStream oos = new ObjectOutputStream(os);
-		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+		try {
+			os = this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create the checkpoint state output stream for " +
+				getOperatorName() + '.', e);
+		}
+
+		try {
+			final ObjectOutputStream oos = new ObjectOutputStream(os);
+			final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+			Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
+			List<FileInputSplit> pendingSplits = readerState.f0;
+			FileInputSplit currSplit = readerState.f1;
+			S formatState = readerState.f2;
+
+			// write the current split
+			oos.writeObject(currSplit);
+
+			// write the pending ones
+			ov.writeInt(pendingSplits.size());
+			for (FileInputSplit split : pendingSplits) {
+				oos.writeObject(split);
+			}
 
-		Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
-		List<FileInputSplit> pendingSplits = readerState.f0;
-		FileInputSplit currSplit = readerState.f1;
-		S formatState = readerState.f2;
+			// write the state of the reading channel
+			oos.writeObject(formatState);
 
-		// write the current split
-		oos.writeObject(currSplit);
+			oos.flush();
+		} catch (Exception exception) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard the stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output stream should delete the written data
+				os.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpoint state output stream belonging to " +
+					"{}. The written data might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write the stream task state of " + getOperatorName()
+				+ " into the checkpoint state output view.", exception);
+		}
+
+		try {
+			taskState.setOperatorState(os.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
 
-		// write the pending ones
-		ov.writeInt(pendingSplits.size());
-		for (FileInputSplit split : pendingSplits) {
-			oos.writeObject(split);
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output stream belonging to " + getOperatorName() + '.', e);
 		}
 
-		// write the state of the reading channel
-		oos.writeObject(formatState);
-		taskState.setOperatorState(os.closeAndGetHandle());
 		return taskState;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d51c320..41b5f7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -181,10 +181,16 @@ public abstract class AbstractStreamOperator<OUT>
 		StreamTaskState state = new StreamTaskState();
 
 		if (stateBackend != null) {
-			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
-				stateBackend.snapshotPartitionedState(checkpointId, timestamp);
-			if (partitionedSnapshots != null) {
-				state.setKvStates(partitionedSnapshots);
+			try {
+				HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
+					stateBackend.snapshotPartitionedState(checkpointId, timestamp);
+
+				if (partitionedSnapshots != null) {
+					state.setKvStates(partitionedSnapshots);
+				}
+			} catch (Exception e) {
+				throw new Exception("Failed to snapshot partitioned state for operator " +
+					getOperatorName() + '.', e);
 			}
 		}
 
@@ -234,6 +240,21 @@ public abstract class AbstractStreamOperator<OUT>
 	public ClassLoader getUserCodeClassloader() {
 		return container.getUserCodeClassLoader();
 	}
+
+	/**
+	 * Return the operator name. If the runtime context has been set, then the task name with
+	 * subtask index is returned. Otherwise, the simple class name is returned.
+	 *
+	 * @return If runtime context is set, then return task name with subtask index. Otherwise return
+	 * 			simple class name.
+	 */
+	protected String getOperatorName() {
+		if (runtimeContext != null) {
+			return runtimeContext.getTaskNameWithSubtasks();
+		} else {
+			return getClass().getSimpleName();
+		}
+	}
 	
 	/**
 	 * Returns a context that allows the operator to query information about the execution and also

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1ddd934..2fe1326 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -129,7 +129,14 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 				udfState = chkFunction.snapshotState(checkpointId, timestamp);
 			} 
 			catch (Exception e) {
-				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
+				try {
+					state.discardState();
+				} catch (Exception discardException) {
+					LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+				}
+
+				throw new Exception("Failed to snapshot function state of " +
+					getOperatorName() + '.', e);
 			}
 			
 			if (udfState != null) {
@@ -140,8 +147,14 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 					state.setFunctionState(handle);
 				}
 				catch (Exception e) {
-					throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
-							+ e.getMessage(), e);
+					try {
+						state.discardState();
+					} catch (Exception discardException) {
+						LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+					}
+
+					throw new Exception("Failed to add the function state snapshot of " +
+						getOperatorName() + " to the checkpoint.", e);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index afc28f4..497f85a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -89,14 +89,32 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
 		//only add handle if a new OperatorState was created since the last snapshot
 		if (out != null) {
-			StateHandle<DataInputView> handle = out.closeAndGetHandle();
+			StateHandle<DataInputView> handle;
+
+			try {
+				handle = out.closeAndGetHandle();
+			} catch (Exception e) {
+				throw new Exception("Could not close and get state handle from checkpoint " +
+					"state output view belonging to " +
+					getOperatorName() + '.', e);
+			}
+
 			if (state.pendingHandles.containsKey(checkpointId)) {
 				//we already have a checkpoint stored for that ID that may have been partially written,
 				//so we discard this "alternate version" and use the stored checkpoint
-				handle.discardState();
+				try {
+					handle.discardState();
+				} catch (Exception exception) {
+					LOG.warn("Could not discard state handle for checkpoint {} of {}, " +
+						"which already has been stored.", checkpointId,
+						getOperatorName(), exception);
+				}
 			} else {
 				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
 			}
+
+			// only set out stream to null in case that we could obtain a state handle
+			// otherwise we might lose some data if we allow failing checkpoints
 			out = null;
 		}
 	}
@@ -104,7 +122,20 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	@Override
 	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-		saveHandleInState(checkpointId, timestamp);
+
+		try {
+			saveHandleInState(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not save handle in state of " +
+				getOperatorName() + '.', e);
+		}
+
 		taskState.setFunctionState(state);
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index fdc8117..c84bc7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -248,15 +248,58 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 		
 		// we write the panes with the key/value maps into the stream, as well as when this state
-		// should have triggered and slided
-		AbstractStateBackend.CheckpointStateOutputView out =
-				getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		// should have triggered and slidedq
+		AbstractStateBackend.CheckpointStateOutputView out;
+
+		try {
+			out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output view to write the " +
+				getOperatorName() + " state into.", e);
+		}
+
+		try {
+			out.writeLong(nextEvaluationTime);
+			out.writeLong(nextSlideTime);
+			panes.writeToOutput(out, keySerializer, stateTypeSerializer);
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output view stream should delete the written data
+				out.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpointed output view for {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write the state for " + getOperatorName() +
+				" into the checkpoint state output view.", ioE);
+		}
+
+		try {
+			taskState.setOperatorState(out.closeAndGetHandle());
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and obtain the state handle from the checkpoint " +
+				"output view of " + getOperatorName() + '.', ioE);
+		}
 
-		out.writeLong(nextEvaluationTime);
-		out.writeLong(nextSlideTime);
-		panes.writeToOutput(out, keySerializer, stateTypeSerializer);
-		
-		taskState.setOperatorState(out.closeAndGetHandle());
 		return taskState;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2434843..16784d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -871,12 +871,54 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		AbstractStateBackend.CheckpointStateOutputView out =
-			getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputView out;
 
-		snapshotTimers(out);
+		try {
+			out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output view for " +
+				getOperatorName() + '.', ioE);
+		}
+
+		try {
+			snapshotTimers(out);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output stream should delete the written data
+				out.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpoint state output view of {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not snapshot the window operators timers of " +
+				getOperatorName() + '.', e);
+		}
 
-		taskState.setOperatorState(out.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(out.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint output view of " +
+				getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 99df060..9531974 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -602,7 +603,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			throw e;
 		}
 		catch (Exception e) {
-			throw new Exception("Error while performing a checkpoint", e);
+			throw new Exception("Error while performing checkpoint " + checkpointId + '.', e);
 		}
 	}
 
@@ -671,7 +672,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				if (allStates.isEmpty()) {
 					getEnvironment().acknowledgeCheckpoint(checkpointId);
 				} else if (!hasAsyncStates) {
-					this.lastCheckpointSize = allStates.getStateSize();
+					try {
+						this.lastCheckpointSize = allStates.getStateSize();
+					} catch (Exception ioE) {
+						LOG.warn("Could not calculate the total state size for checkpoint {}.", checkpointId, ioE);
+					}
+
 					getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 				} else {
 					// start a Thread that does the asynchronous materialization and
@@ -694,11 +700,26 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
 				// yet be created
 				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+
+				Exception exception = null;
+
 				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
-					output.writeEventToAllChannels(message);
+					try {
+						output.writeEventToAllChannels(message);
+					} catch (IOException ioE) {
+						if (exception == null) {
+							exception = new Exception("Could not send CancelCheckpointMarker to downstream tasks.", ioE);
+						} else {
+							exception.addSuppressed(ioE);
+						}
+					}
 				}
 
-				return false;
+				if (exception == null) {
+					return false;
+				} else {
+					throw exception;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
index ae85d86..4a208a4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
@@ -37,7 +37,7 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
 	/** The states for all operator */
 	private final StreamTaskState[] states;
 
-	public StreamTaskStateList(StreamTaskState[] states) throws Exception {
+	public StreamTaskStateList(StreamTaskState[] states) {
 		this.states = states;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index f0113d1..184e67b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -17,17 +17,28 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for {@link StreamMap}. These test that:
@@ -38,6 +49,8 @@ import org.junit.Test;
  *     <li>Watermarks are correctly forwarded</li>
  * </ul>
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
 public class StreamMapTest {
 
 	private static class Map implements MapFunction<Integer, String> {
@@ -91,6 +104,51 @@ public class StreamMapTest {
 		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
 	}
 
+	@Test
+	public void testFailingSnapshot() throws Exception {
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		StreamMap<String, String> operator = new StreamMap<>(new TestCheckpointedMapFunction());
+
+		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+
+			fail("Expected exception here.");
+		} catch (Exception expected) {
+			// expected exception
+		}
+
+		verify(streamTaskState).discardState();
+	}
+
+	private static class TestCheckpointedMapFunction implements MapFunction<String, String>, Checkpointed<String> {
+
+		private static final long serialVersionUID = 2353250741656753525L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			throw new IOException("Test exception.");
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			// noop
+		}
+	}
+
 	// This must only be used in one test, otherwise the static fields will be changed
 	// by several tests concurrently
 	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
@@ -103,7 +161,7 @@ public class StreamMapTest {
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 			if (closeCalled) {
-				Assert.fail("Close called before open.");
+				fail("Close called before open.");
 			}
 			openCalled = true;
 		}
@@ -112,7 +170,7 @@ public class StreamMapTest {
 		public void close() throws Exception {
 			super.close();
 			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
+				fail("Open was not called before close.");
 			}
 			closeCalled = true;
 		}
@@ -120,7 +178,7 @@ public class StreamMapTest {
 		@Override
 		public String map(String value) throws Exception {
 			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
+				fail("Open was not called before run.");
 			}
 			return value;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index dfa353c..72939d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -32,13 +32,14 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -70,7 +71,11 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -81,7 +86,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
 public class WindowOperatorTest {
 
 	// For counting if close() is called the correct number of times on the SumReducer
@@ -2391,6 +2405,178 @@ public class WindowOperatorTest {
 		testHarness.close();
 	}
 
+	/**
+	 * Tests that the StreamTaskState and the CheckpointStateOutputStream are discarded and closed
+	 * in case of a failure while writing to the CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingSnapshotCall() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+
+		doThrow(new IOException("Test Exception")).when(outputStream).write(anyInt());
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenCallRealMethod();
+		when(stateBackend.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outputStream);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(outputStream).close();
+		verify(streamTaskState).discardState();
+	}
+
+	/**
+	 * Tests that the StreamTaskState is discarded in case of a failure while obtaining the
+	 * CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingCheckpointStateOutputStreamCreation() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenThrow(new IOException("Test Exception"));
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(streamTaskState).discardState();
+	}
+
+	/**
+	 * Tests that the StreamTaskState is discarded in case of a failure while closing and getting
+	 * the state handle from the CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingCloseAndGetHandleInSnapshotMethod() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+
+		doThrow(new IOException("Test Exception")).when(outputStream).closeAndGetHandle();
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenCallRealMethod();
+		when(stateBackend.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outputStream);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(outputStream).closeAndGetHandle();
+		verify(streamTaskState).discardState();
+	}
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------


[2/3] flink git commit: [FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Posted by tr...@apache.org.
[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly cleaned up and also
the triggering of subsequent checkpoints will be started.

Fix failing SavepointCoordinatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b734d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b734d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b734d7b

Branch: refs/heads/release-1.1
Commit: 4b734d7b8726200e5293c32f2cb9e8c77db4d378
Parents: d314bc5
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 24 18:16:28 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 1 18:00:53 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 143 +++++++++++-------
 .../runtime/checkpoint/CheckpointException.java |  35 +++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  16 +--
 .../CheckpointCoordinatorFailureTest.java       | 144 +++++++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   4 +-
 5 files changed, 275 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0d09922..74e6d08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -671,24 +671,17 @@ public class CheckpointCoordinator {
 	 *
 	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
 	 */
-	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
+	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
 			return false;
 		}
 		if (!job.equals(message.getJob())) {
-			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 			return false;
 		}
 
 		final long checkpointId = message.getCheckpointId();
 
-		CompletedCheckpoint completed = null;
-		PendingCheckpoint checkpoint;
-
-		// Flag indicating whether the ack message was for a known pending
-		// checkpoint.
-		boolean isPendingCheckpoint;
-
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -696,45 +689,16 @@ public class CheckpointCoordinator {
 				return false;
 			}
 
-			checkpoint = pendingCheckpoints.get(checkpointId);
+			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				isPendingCheckpoint = true;
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
 					case SUCCESS:
 
 						if (checkpoint.isFullyAcknowledged()) {
-
-							lastCheckpointCompletionNanos = System.nanoTime();
-							completed = checkpoint.finalizeCheckpoint();
-
-							completedCheckpointStore.addCheckpoint(completed);
-
-							LOG.info("Completed checkpoint " + checkpointId + " (in " +
-								completed.getDuration() + " ms)");
-
-							if (LOG.isDebugEnabled()) {
-								StringBuilder builder = new StringBuilder();
-								builder.append("Checkpoint state: ");
-								for (TaskState state : completed.getTaskStates().values()) {
-									builder.append(state);
-									builder.append(", ");
-								}
-								// Remove last two chars ", "
-								builder.delete(builder.length() - 2, builder.length());
-
-								LOG.debug(builder.toString());
-							}
-
-							pendingCheckpoints.remove(checkpointId);
-							rememberRecentCheckpointId(checkpointId);
-
-							dropSubsumedCheckpoints(completed.getTimestamp());
-
-							onFullyAcknowledgedCheckpoint(completed);
-
-							triggerQueuedRequests();
+							completePendingCheckpoint(checkpoint);
+							
 						}
 						break;
 					case DUPLICATE:
@@ -757,6 +721,8 @@ public class CheckpointCoordinator {
 
 						discardState(message.getState());
 				}
+
+				return true;
 			}
 			else if (checkpoint != null) {
 				// this should not happen
@@ -764,39 +730,108 @@ public class CheckpointCoordinator {
 						"Received message for discarded but non-removed checkpoint " + checkpointId);
 			}
 			else {
+				boolean wasPendingCheckpoint;
+
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
-					isPendingCheckpoint = true;
+					wasPendingCheckpoint = true;
 					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
 				}
 				else {
 					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
-					isPendingCheckpoint = false;
+					wasPendingCheckpoint = false;
 				}
 
 				// try to discard the state so that we don't have lingering state lying around
 				discardState(message.getState());
+
+				return wasPendingCheckpoint;
+			}
+		}
+	}
+
+	/**
+	 * Try to complete the given pending checkpoint.
+	 *
+	 * Important: This method should only be called in the checkpoint lock scope.
+	 *
+	 * @param pendingCheckpoint to complete
+	 * @throws CheckpointException if the completion failed
+	 */
+	private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
+		// we have to be called in the checkpoint lock scope
+		assert(Thread.holdsLock(lock));
+
+		final long checkpointId = pendingCheckpoint.getCheckpointId();
+		CompletedCheckpoint completedCheckpoint = null;
+
+		try {
+			completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();			
+
+			completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+			rememberRecentCheckpointId(checkpointId);
+			dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+			onFullyAcknowledgedCheckpoint(completedCheckpoint);
+		} catch (Exception exception) {
+			// abort the current pending checkpoint if it has not been discarded yet
+			if (!pendingCheckpoint.isDiscarded()) {
+				pendingCheckpoint.discard(userClassLoader);
+			}
+
+			if (completedCheckpoint != null) {
+				// we failed to store the completed checkpoint. Let's clean up
+				final CompletedCheckpoint cc = completedCheckpoint;
+
+				executor.execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							cc.discard(userClassLoader);
+						} catch (Exception nestedException) {
+							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+						}
+					}
+				});
 			}
+
+			throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+		} finally {
+			pendingCheckpoints.remove(checkpointId);
+
+			triggerQueuedRequests();
 		}
+		
+		lastCheckpointCompletionNanos = System.nanoTime();
+
+		LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, completedCheckpoint.getDuration());
 
-		// send the confirmation messages to the necessary targets. we do this here
-		// to be outside the lock scope
-		if (completed != null) {
-			final long timestamp = completed.getTimestamp();
+		if (LOG.isDebugEnabled()) {
+			StringBuilder builder = new StringBuilder();
+			builder.append("Checkpoint state: ");
+			for (TaskState state : completedCheckpoint.getTaskStates().values()) {
+				builder.append(state);
+				builder.append(", ");
+			}
+			// Remove last two chars ", "
+			builder.delete(builder.length() - 2, builder.length());
+
+			LOG.debug(builder.toString());
+		}
 
-			for (ExecutionVertex ev : tasksToCommitTo) {
-				Execution ee = ev.getCurrentExecutionAttempt();
-				if (ee != null) {
+		final long timestamp = completedCheckpoint.getTimestamp();
+
+		for (ExecutionVertex ev : tasksToCommitTo) {
+			Execution ee = ev.getCurrentExecutionAttempt();
+			if (ee != null) {
 					ExecutionAttemptID attemptId = ee.getAttemptId();
 					NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
 					ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
-				}
 			}
-
-			statsTracker.onCompletedCheckpoint(completed);
 		}
 
-		return isPendingCheckpoint;
+		statsTracker.onCompletedCheckpoint(completedCheckpoint);
 	}
 
 	private void rememberRecentCheckpointId(long id) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
new file mode 100644
index 0000000..707878c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Base class for checkpoint related exceptions.
+ */
+public class CheckpointException extends Exception {
+
+	private static final long serialVersionUID = -4341865597039002540L;
+
+	public CheckpointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CheckpointException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 22ba9f2..6f185bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -129,12 +129,10 @@ public class PendingCheckpoint {
 		return discarded;
 	}
 	
-	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+	public CompletedCheckpoint finalizeCheckpoint() {
 		synchronized (lock) {
-			if (discarded) {
-				throw new IllegalStateException("pending checkpoint is discarded");
-			}
-			if (notYetAcknowledgedTasks.isEmpty()) {
+			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
 				CompletedCheckpoint completed =  new CompletedCheckpoint(
 					jobId,
 					checkpointId,
@@ -144,10 +142,6 @@ public class PendingCheckpoint {
 				dispose(null, false);
 				
 				return completed;
-			}
-			else {
-				throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
-			}
 		}
 	}
 	
@@ -237,10 +231,9 @@ public class PendingCheckpoint {
 
 	private void dispose(final ClassLoader userClassLoader, boolean releaseState) {
 		synchronized (lock) {
-			discarded = true;
 			numAcknowledgedTasks = -1;
 			try {
-				if (releaseState) {
+				if (!discarded && releaseState) {
 					executor.execute(new Runnable() {
 						@Override
 						public void run() {
@@ -257,6 +250,7 @@ public class PendingCheckpoint {
 
 				}
 			} finally {
+				discarded = true;
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
new file mode 100644
index 0000000..e74bbd8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.util.TestExecutors;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PendingCheckpoint.class)
+public class CheckpointCoordinatorFailureTest extends TestLogger {
+
+	/**
+	 * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
+	 * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
+	 */
+	@Test
+	public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+		JobID jid = new JobID();
+
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+		final long triggerTimestamp = 1L;
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			42,
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			getClass().getClassLoader(),
+			new StandaloneCheckpointIDCounter(),
+			new FailingCompletedCheckpointStore(),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
+
+		coord.triggerCheckpoint(triggerTimestamp);
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next();
+
+		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId);
+
+		CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class);
+		PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
+				"store the completed checkpoint.");
+		} catch (CheckpointException e) {
+			// ignore because we expected this exception
+		}
+
+		// make sure that the pending checkpoint has been discarded after we could not complete it
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		verify(completedCheckpoint).discard(getClass().getClassLoader());
+	}
+
+	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+		@Override
+		public void recover() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+			throw new Exception("The failing completed checkpoint store failed again... :-(");
+		}
+
+		@Override
+		public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void shutdown() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void suspend() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public int getNumberOfRetainedCheckpoints() {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9159711..d02e48f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1500,7 +1500,7 @@ public class CheckpointCoordinatorTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+	static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(attemptID, 1);
 	}
 
@@ -1508,7 +1508,7 @@ public class CheckpointCoordinatorTest {
 		return mockExecutionVertex(attemptId, ExecutionState.RUNNING, parallelism);
 	}
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, 
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID,
 														ExecutionState state, ExecutionState ... successiveStates) {
 		return mockExecutionVertex(attemptID, state, 1, successiveStates);
 	}