You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:01 UTC

[10/47] flink git commit: [FLINK-2354] [runtime] Remove state changing futures in JobManager

[FLINK-2354] [runtime] Remove state changing futures in JobManager

Internal actor states must only be modified within the actor thread.
This avoids all the well-known issues coming with concurrency.

Fix RemoveCachedJob by introducing RemoveJob

Fix JobManagerITCase

Add removeJob which maintains the job in the SubmittedJobGraphStore

Make revokeLeadership not remove the jobs from the state backend

Fix shading problem with curator by hiding CuratorFramework in ChaosMonkeyITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2989f2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2989f2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2989f2b

Branch: refs/heads/master
Commit: c2989f2b1839055858e4b328473d0a8313094ff3
Parents: 73c73e9
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 9 00:50:07 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:52 2015 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 262 +++++++++++--------
 .../runtime/messages/JobManagerMessages.scala   |  17 ++
 ...ManagerSubmittedJobGraphsRecoveryITCase.java |   5 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |  10 +
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   2 +-
 .../flink-shaded-curator-recipes/pom.xml        |  78 ++++++
 .../flink-shaded-curator-test/pom.xml           |  86 ++++++
 flink-shaded-curator/pom.xml                    |  82 ++----
 flink-tests/pom.xml                             |   7 +
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  15 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  13 +-
 .../flink/yarn/TestingYarnJobManager.scala      |  10 +-
 14 files changed, 404 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f79c5ed..9db82b2 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -182,7 +182,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator</artifactId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		
@@ -417,7 +417,7 @@ under the License.
 						<configuration>
 							<artifactSet>
 								<includes combine.children="append">
-									<include>org.apache.flink:flink-shaded-curator</include>
+									<include>org.apache.flink:flink-shaded-curator-recipes</include>
 								</includes>
 							</artifactSet>
 							<relocations combine.children="append">

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f3e4054..eef28d8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMess
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
@@ -127,6 +126,9 @@ class JobManager(
 
   var leaderSessionID: Option[UUID] = None
 
+  /** Futures which have to be completed before terminating the job manager */
+  var futuresToComplete: Option[Seq[Future[Unit]]] = None
+
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -163,7 +165,16 @@ class JobManager(
   override def postStop(): Unit = {
     log.info(s"Stopping JobManager ${getAddress}.")
 
-    cancelAndClearEverything(new Exception("The JobManager is shutting down."))
+    val newFuturesToComplete = cancelAndClearEverything(
+      new Exception("The JobManager is shutting down."),
+      true)
+
+    implicit val executionContext = context.dispatcher
+
+    val futureToComplete = Future.sequence(
+      futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
+
+    Await.ready(futureToComplete, timeout)
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -235,9 +246,11 @@ class JobManager(
     case RevokeLeadership =>
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
 
-      future {
-        cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
-      }(context.dispatcher)
+      val newFuturesToComplete = cancelAndClearEverything(
+        new Exception("JobManager is no longer the leader."),
+        false)
+
+      futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
 
       // disconnect the registered task managers
       instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -315,9 +328,15 @@ class JobManager(
       val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
         jobGraph.getSessionTimeout)
 
-      future {
-        submitJob(jobGraph, jobInfo)
-      }(context.dispatcher)
+      submitJob(jobGraph, jobInfo)
+
+    case RecoverSubmittedJob(submittedJobGraph) =>
+      if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+        submitJob(
+          submittedJobGraph.getJobGraph(),
+          submittedJobGraph.getJobInfo(),
+          isRecovery = true)
+      }
 
     case RecoverJob(jobId) =>
       future {
@@ -328,19 +347,18 @@ class JobManager(
 
           log.info(s"Attempting to recover job $jobId.")
 
-          val jobGraph = submittedJobGraphs.recoverJobGraph(jobId)
+          val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
 
-          if (jobGraph.isDefined) {
-            if (!leaderElectionService.hasLeadership()) {
-              // we've lost leadership. mission: abort.
-              log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
-            }
-            else {
-              recoverJobGraph(jobGraph.get)
-            }
-          }
-          else {
-            log.warn(s"Failed to recover job graph ${jobId}.")
+          submittedJobGraphOption match {
+            case Some(submittedJobGraph) =>
+              if (!leaderElectionService.hasLeadership()) {
+                // we've lost leadership. mission: abort.
+                log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
+              }
+              else {
+                self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+              }
+            case None => log.warn(s"Failed to recover job graph $jobId.")
           }
         }
       }(context.dispatcher)
@@ -362,7 +380,10 @@ class JobManager(
           else {
             log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
 
-            jobGraphs.foreach(recoverJobGraph(_))
+            jobGraphs.foreach{
+              submittedJobGraph =>
+                self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+            }
           }
         }
       }(context.dispatcher)
@@ -473,7 +494,7 @@ class JobManager(
           if (newJobStatus.isTerminalState()) {
             jobInfo.end = timeStamp
 
-            future {
+            future{
               // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will
               // linger around and potentially be recovered at a later time. There is nothing we
               // can do about that, but it should be communicated with the Client.
@@ -483,11 +504,11 @@ class JobManager(
                 context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
                   // remove only if no activity occurred in the meantime
                   if (lastActivity == jobInfo.lastActive) {
-                    removeJob(jobID)
+                    self ! decorateMessage(RemoveJob(jobID, true))
                   }
-                }
+                }(context.dispatcher)
               } else {
-                removeJob(jobID)
+                self ! decorateMessage(RemoveJob(jobID, true))
               }
 
               // is the client waiting for the job result?
@@ -539,9 +560,7 @@ class JobManager(
             }(context.dispatcher)
           }
         case None =>
-          future {
-            removeJob(jobID)
-          }(context.dispatcher)
+          self ! decorateMessage(RemoveJob(jobID, true))
       }
 
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -646,9 +665,7 @@ class JobManager(
     case Heartbeat(instanceID, metricsReport, accumulators) =>
       log.debug(s"Received hearbeat message from $instanceID.")
 
-      Future {
-        updateAccumulators(accumulators)
-      }(context.dispatcher)
+      updateAccumulators(accumulators)
 
       instanceManager.reportHeartBeat(instanceID, metricsReport)
 
@@ -671,11 +688,26 @@ class JobManager(
     case RequestJobManagerStatus =>
       sender() ! decorateMessage(JobManagerStatusAlive)
 
+    case RemoveJob(jobID, clearPersistedJob) =>
+      currentJobs.get(jobID) match {
+        case Some((graph, info)) =>
+            removeJob(graph.getJobID, clearPersistedJob) match {
+              case Some(futureToComplete) =>
+                futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
+              case None =>
+            }
+        case None =>
+      }
+
     case RemoveCachedJob(jobID) =>
       currentJobs.get(jobID) match {
         case Some((graph, info)) =>
           if (graph.getState.isTerminalState) {
-            removeJob(graph.getJobID)
+            removeJob(graph.getJobID, true) match {
+              case Some(futureToComplete) =>
+                futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
+              case None =>
+            }
           } else {
             // triggers removal upon completion of job
             info.sessionAlive = false
@@ -761,6 +793,7 @@ class JobManager(
               jobGraph.getClasspaths,
               userCodeLoader)
 
+            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
             graph
         }
 
@@ -878,22 +911,6 @@ class JobManager(
           executionGraph.registerExecutionListener(gateway)
           executionGraph.registerJobStatusListener(gateway)
         }
-
-        if (isRecovery) {
-          executionGraph.restoreLatestCheckpointedState()
-        }
-        else {
-          submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
-        }
-
-        // Add the job graph only after everything is finished. Otherwise there can be races in
-        // tests, which check the currentJobs (for example before killing a JM).
-        if (!currentJobs.contains(jobId)) {
-          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
-        }
-
-        // done with submitting the job
-        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
       }
       catch {
         case t: Throwable =>
@@ -916,20 +933,39 @@ class JobManager(
           return
       }
 
-      if (leaderElectionService.hasLeadership) {
-        // There is a small chance that multiple job managers schedule the same job after if they
-        // try to recover at the same time. This will eventually be noticed, but can not be ruled
-        // out from the beginning.
-
-        // NOTE: Scheduling the job for execution is a separate action from the job submission.
-        // The success of submitting the job must be independent from the success of scheduling
-        // the job.
+      // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
+      // because it is a blocking operation
+      future {
         try {
-          log.info(s"Scheduling job $jobId ($jobName).")
+          if (isRecovery) {
+            executionGraph.restoreLatestCheckpointedState()
+          }
+          else {
+            submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
+          }
 
-          executionGraph.scheduleForExecution(scheduler)
-        }
-        catch {
+          jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+
+          if (leaderElectionService.hasLeadership) {
+            // There is a small chance that multiple job managers schedule the same job after if
+            // they try to recover at the same time. This will eventually be noticed, but can not be
+            // ruled out from the beginning.
+
+            // NOTE: Scheduling the job for execution is a separate action from the job submission.
+            // The success of submitting the job must be independent from the success of scheduling
+            // the job.
+            log.info(s"Scheduling job $jobId ($jobName).")
+
+            executionGraph.scheduleForExecution(scheduler)
+          } else {
+            // Remove the job graph. Otherwise it will be lingering around and possibly removed from
+            // ZooKeeper by this JM.
+            self ! decorateMessage(RemoveJob(jobId, false))
+
+            log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
+              "this. I am not scheduling the job for execution.")
+          }
+        } catch {
           case t: Throwable => try {
             executionGraph.fail(t)
           }
@@ -939,27 +975,6 @@ class JobManager(
             }
           }
         }
-      }
-      else {
-        // Remove the job graph. Otherwise it will be lingering around and possibly removed from
-        // ZooKeeper by this JM.
-        currentJobs.remove(jobId)
-
-        log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
-          "this. I am not scheduling the job for execution.")
-      }
-    }
-  }
-
-  /**
-   * Submits the job if it is not already one of our current jobs.
-   *
-   * @param jobGraph Job to recover
-   */
-  private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = {
-    if (!currentJobs.contains(jobGraph.getJobId)) {
-      future {
-        submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true)
       }(context.dispatcher)
     }
   }
@@ -1169,20 +1184,24 @@ class JobManager(
    * might block. Therefore be careful not to block the actor thread.
    *
    * @param jobID ID of the job to remove and archive
+   * @param removeJobFromStateBackend true if the job shall be archived and removed from the state
+   *                            backend
    */
-  private def removeJob(jobID: JobID): Unit = {
-    currentJobs.synchronized {
-      // Don't remove the job yet...
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          try {
-            // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
-            // and the ZooKeeper client is closed. Not removing the job immediately allow the
-            // shutdown to release all resources.
-            submittedJobGraphs.removeJobGraph(jobID)
-          } catch {
-            case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
-          }
+  private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = {
+    // Don't remove the job yet...
+    val futureOption = currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        val result = if (removeJobFromStateBackend) {
+          val futureOption = Some(future {
+            try {
+              // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
+              // and the ZooKeeper client is closed. Not removing the job immediately allow the
+              // shutdown to release all resources.
+              submittedJobGraphs.removeJobGraph(jobID)
+            } catch {
+              case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+            }
+          }(context.dispatcher))
 
           try {
             eg.prepareForArchiving()
@@ -1193,9 +1212,15 @@ class JobManager(
               "archiving.", t)
           }
 
-          currentJobs.remove(jobID)
-        case None =>
-      }
+          futureOption
+        } else {
+          None
+        }
+
+        currentJobs.remove(jobID)
+
+        result
+      case None => None
     }
 
     try {
@@ -1204,6 +1229,8 @@ class JobManager(
       case t: Throwable =>
         log.error(s"Could not properly unregister job $jobID form the library cache.", t)
     }
+
+    futureOption
   }
 
   /** Fails all currently running jobs and empties the list of currently running jobs. If the
@@ -1211,26 +1238,35 @@ class JobManager(
     *
     * @param cause Cause for the cancelling.
     */
-  private def cancelAndClearEverything(cause: Throwable) {
-    for ((jobID, (eg, jobInfo)) <- currentJobs) {
-      try {
-        submittedJobGraphs.removeJobGraph(jobID)
-      }
-      catch {
-        case t: Throwable => {
-          log.error("Error during submitted job graph clean up.", t)
+  private def cancelAndClearEverything(
+      cause: Throwable,
+      removeJobFromStateBackend: Boolean)
+    : Seq[Future[Unit]] = {
+    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
+      future {
+        if (removeJobFromStateBackend) {
+          try {
+            submittedJobGraphs.removeJobGraph(jobID)
+          }
+          catch {
+            case t: Throwable => {
+              log.error("Error during submitted job graph clean up.", t)
+            }
+          }
         }
-      }
 
-      eg.fail(cause)
+        eg.fail(cause)
 
-      if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
-        jobInfo.client ! decorateMessage(
-          Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
-      }
+        if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
+          jobInfo.client ! decorateMessage(
+            Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
+        }
+      }(context.dispatcher)
     }
 
     currentJobs.clear()
+
+    futures.toSeq
   }
 
   override def grantLeadership(newLeaderSessionID: UUID): Unit = {
@@ -1285,7 +1321,9 @@ class JobManager(
       case accumulatorEvent =>
         currentJobs.get(accumulatorEvent.getJobID) match {
           case Some((jobGraph, jobInfo)) =>
-            jobGraph.updateAccumulators(accumulatorEvent)
+            future {
+              jobGraph.updateAccumulators(accumulatorEvent)
+            }(context.dispatcher)
           case None =>
           // ignore accumulator values for old job
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index d776622..8097bdc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
 import org.apache.flink.runtime.util.SerializedThrowable
 
 import scala.collection.JavaConverters._
@@ -73,6 +74,14 @@ object JobManagerMessages {
   case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
 
   /**
+   * Triggers the submission of the recovered job
+   *
+   * @param submittedJobGraph Contains the submitted JobGraph and the associated JobInfo
+   */
+  case class RecoverSubmittedJob(submittedJobGraph: SubmittedJobGraph)
+    extends RequiresLeaderSessionID
+
+  /**
    * Triggers recovery of all available jobs.
    */
   case class RecoverAllJobs() extends RequiresLeaderSessionID
@@ -286,6 +295,14 @@ object JobManagerMessages {
    */
   case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
 
+  /** Triggers the removal of the job with the given job ID
+    *
+    * @param jobID
+    * @param removeJobFromStateBackend true if the job has properly finished
+    */
+  case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true)
+    extends RequiresLeaderSessionID
+
   /**
    * Removes the job belonging to the job identifier from the job manager and archives it.
    * @param jobID The job identifier

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
index ac250bd..e6156e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -194,7 +194,9 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
 			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
 					leadingJobManager, deadline.timeLeft());
 
-			// Make sure that the **non-leading** JM has actually removed the job graph from her
+			log.info("Wait that the non-leader removes the submitted job.");
+
+			// Make sure that the **non-leading** JM has actually removed the job graph from its
 			// local state.
 			boolean success = false;
 			while (!success && deadline.hasTimeLeft()) {
@@ -205,6 +207,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
 					success = true;
 				}
 				else {
+					log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString());
 					Thread.sleep(100);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index 7ae89d1..94e1988 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.util.List;
+
 /**
  * Simple ZooKeeper and CuratorFramework setup for tests.
  */
@@ -111,6 +113,14 @@ public class ZooKeeperTestEnvironment {
 		return client;
 	}
 
+	public String getClientNamespace() {
+		return client.getNamespace();
+	}
+
+	public List<String> getChildren(String path) throws Exception {
+		return client.getChildren().forPath(path);
+	}
+
 	/**
 	 * Creates a new client for the started ZooKeeper server/cluster.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 76b237e..1ca02aa 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3a252f8..0f800c9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -657,7 +657,7 @@ class JobManagerITCase(_system: ActorSystem)
           jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
           expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
 
-          // job stil running
+          // job still running
           jm.tell(RemoveCachedJob(jobGraph2.getJobID), self)
 
           expectMsgType[JobResultSuccess]

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
new file mode 100644
index 0000000..c0a2adc
--- /dev/null
+++ b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-shaded-curator</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-shaded-curator-recipes</artifactId>
+	<name>flink-shaded-curator-recipes</name>
+
+	<packaging>jar</packaging>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
+
+		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet combine.self="override">
+								<excludes>
+									<exclude>log4j</exclude>
+									<exclude>org.slf4j:slf4j-log4j12</exclude>
+								</excludes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-test/pom.xml b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
new file mode 100644
index 0000000..2700c0c
--- /dev/null
+++ b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-shaded-curator</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-shaded-curator-test</artifactId>
+	<name>flink-shaded-curator-test</name>
+
+	<packaging>jar</packaging>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
+
+		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet combine.self="override">
+								<excludes>
+									<exclude>log4j</exclude>
+									<exclude>org.slf4j:slf4j-log4j12</exclude>
+								</excludes>
+								<includes combine.children="append">
+									<include>org.apache.curator:curator-test</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>org.apache.curator</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-shaded-curator/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
index ac62cc8..29d6461 100644
--- a/flink-shaded-curator/pom.xml
+++ b/flink-shaded-curator/pom.xml
@@ -1,22 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
@@ -29,50 +28,13 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
+	<modules>
+        <module>flink-shaded-curator-recipes</module>
+		<module>flink-shaded-curator-test</module>
+	</modules>
+
 	<artifactId>flink-shaded-curator</artifactId>
 	<name>flink-shaded-curator</name>
 
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-
-		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet combine.self="override">
-								<excludes>
-									<exclude>log4j</exclude>
-									<exclude>org.slf4j:slf4j-log4j12</exclude>
-								</excludes>
-							</artifactSet>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
+	<packaging>pom</packaging>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 0dd20b1..b9bae6f 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -148,6 +148,13 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-curator-test</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_${scala.binary.version}</artifactId>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index a0c8312..2cdf83c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -523,22 +523,19 @@ public class ChaosMonkeyITCase {
 	}
 
 	private void checkCleanRecoveryState(Configuration config) throws Exception {
-		LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+		LOG.info("Checking " + ZooKeeper.getClientNamespace() +
 				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
-		List<String> jobGraphs = ZooKeeper.getClient().getChildren()
-				.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+		List<String> jobGraphs = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 		assertEquals("Unclean job graphs: " + jobGraphs, 0, jobGraphs.size());
 
-		LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+		LOG.info("Checking " + ZooKeeper.getClientNamespace() +
 				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
-		List<String> checkpoints = ZooKeeper.getClient().getChildren()
-				.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+		List<String> checkpoints = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 		assertEquals("Unclean checkpoints: " + checkpoints, 0, checkpoints.size());
 
-		LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+		LOG.info("Checking " + ZooKeeper.getClientNamespace() +
 				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
-		List<String> checkpointCounter = ZooKeeper.getClient().getChildren()
-				.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		List<String> checkpointCounter = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 		assertEquals("Unclean checkpoint counter: " + checkpointCounter, 0, checkpointCounter.size());
 
 		LOG.info("ZooKeeper state is clean");

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 94d0a81..a05621a 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -37,7 +38,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -53,6 +56,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	private static final int numberApplicationAttempts = 10;
 
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
 	@BeforeClass
 	public static void setup() {
 		actorSystem = AkkaUtils.createDefaultActorSystem();
@@ -102,9 +108,14 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		String confDirPath = System.getenv("FLINK_CONF_DIR");
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 
+		String fsStateHandlePath = tmp.getRoot().getPath();
+
 		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
-			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts);
+			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
+			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+			"@@" + ConfigConstants.STATE_BACKEND_FS_DIR + "=" + fsStateHandlePath + "/checkpoints" +
+			"@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		AbstractFlinkYarnCluster yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c2989f2b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 83d1f3c..fa70039 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -21,8 +21,10 @@ package org.apache.flink.yarn
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
@@ -60,7 +62,9 @@ class TestingYarnJobManager(
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
     mode: StreamingMode,
-    leaderElectionService: LeaderElectionService)
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends YarnJobManager(
     flinkConfiguration,
     executionContext,
@@ -72,7 +76,9 @@ class TestingYarnJobManager(
     delayBetweenRetries,
     timeout,
     mode,
-    leaderElectionService)
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory)
   with TestingJobManagerLike {
 
   override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]