You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/02 15:51:56 UTC

[3/3] flink git commit: [FLINK-7778] [build] Shade ZooKeeper dependency (part 1)

[FLINK-7778] [build] Shade ZooKeeper dependency (part 1)

Shading the ZooKeeper dependency makes sure that this specific version of
ZooKeeper is used by the Flink runtime module. The ZooKeeper version is
sensitive, because we depend on bug fixes in later ZooKeeper versions
for Flink's high availability.

This prevents situations where for example a set of added dependencies (for
example transtive dependencies of Hadoop) cause a different ZooKeeper version
to be in the classpath and be loaded.

This commit also removes the 'flink-shaded-curator' module, which was originally
created to shade guava within curator, but is now obsolete, because newer
versions of curator shade guava already.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d028236
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d028236
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d028236

Branch: refs/heads/master
Commit: 4d0282364f2e70fce0b55e218edccabd8e079fd8
Parents: 786a6cb
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 13 22:14:09 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 2 16:51:25 2017 +0100

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  38 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     | 447 ++++++++++++++++++
 .../flink-shaded-curator-recipes/pom.xml        |  81 ----
 .../flink-shaded-curator-test/pom.xml           |  95 ----
 flink-shaded-curator/pom.xml                    |  41 --
 flink-tests/pom.xml                             |   6 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     | 448 -------------------
 pom.xml                                         |   1 -
 8 files changed, 480 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a450f1c..46990a9 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -152,11 +152,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.zookeeper</groupId>
-			<artifactId>zookeeper</artifactId>
-		</dependency>
-
-		<dependency>
 			<groupId>org.xerial.snappy</groupId>
 			<artifactId>snappy-java</artifactId>
 			<version>1.1.4</version>
@@ -174,10 +169,26 @@ under the License.
 			<version>${chill.version}</version>
 		</dependency>
 
+		<!-- Curator and ZooKeeper - we explicitly add ZooKeeper here as
+			well to make sure our managed version is used -->
+
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator-recipes</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+			<version>${curator.version}</version>
+			<exclusions>
+				<!-- curator shades guava, but still has a dependency on it. -->
+				<!-- We can safely exclude Guava here -->
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -437,6 +448,8 @@ under the License.
 									<include>com.typesafe.akka:akka-remote_*</include>
 									<include>io.netty:netty</include>
 									<include>org.uncommons.maths:uncommons-maths</include>
+									<include>org.apache.curator:*</include>
+									<include>org.apache.zookeeper:*</include>
 								</includes>
 							</artifactSet>
 							<relocations combine.children="append">
@@ -452,6 +465,15 @@ under the License.
 									<pattern>org.apache.curator</pattern>
 									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>org.apache.zookeeper</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper</shadedPattern>
+								</relocation>
+								<!-- jute is already shaded into the ZooKeeper jar -->
+								<relocation>
+									<pattern>org.apache.jute</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper.jute</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
new file mode 100644
index 0000000..f5d6802
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests recovery of {@link SubmittedJobGraph} instances.
+ */
+public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
+
+	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		ZooKeeper.shutdown();
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
+	 */
+	@Test
+	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
+
+		// Configure the cluster
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		TestingCluster flink = new TestingCluster(config, false, false);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Start the JobManager and TaskManager
+			flink.start(true);
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			// Set restart strategy to guard against shut down races.
+			// If the TM fails before the JM, it might happen that the
+			// Job is failed, leading to state removal.
+			ExecutionConfig ec = new ExecutionConfig();
+			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
+			jobGraph.setExecutionConfig(ec);
+
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+			// Submit the job
+			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+			// Wait for the job to start
+			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+					jobManager, deadline.timeLeft());
+		}
+		finally {
+			flink.stop();
+		}
+
+		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
+		// been shutdown
+		verifyRecoveryState(config);
+	}
+
+	/**
+	 * Tests that clients receive updates after recovery by a new leader.
+	 */
+	@Test
+	public void testClientNonDetachedListeningBehaviour() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
+
+		// Test actor system
+		ActorSystem testSystem = null;
+
+		// JobManager setup. Start the job managers as separate processes in order to not run the
+		// actors postStop, which cleans up all running jobs.
+		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+
+		LeaderRetrievalService leaderRetrievalService = null;
+
+		ActorSystem taskManagerSystem = null;
+
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Test actor system
+			testSystem = AkkaUtils.createActorSystem(new Configuration(),
+					new Some<>(new Tuple2<String, Object>("localhost", 0)));
+
+			// The job managers
+			jobManagerProcess[0] = new JobManagerProcess(0, config);
+			jobManagerProcess[1] = new JobManagerProcess(1, config);
+
+			jobManagerProcess[0].startProcess();
+			jobManagerProcess[1].startProcess();
+
+			// Leader listener
+			TestingListener leaderListener = new TestingListener();
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService.start(leaderListener);
+
+			// The task manager
+			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+			TaskManager.startTaskManagerComponentsAndActor(
+				config,
+				ResourceID.generate(),
+				taskManagerSystem,
+				highAvailabilityServices,
+				new NoOpMetricRegistry(),
+				"localhost",
+				Option.<String>empty(),
+				false,
+				TaskManager.class);
+
+			// Client test actor
+			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
+					testSystem, Props.create(RecordingTestClient.class));
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			{
+				// Initial submission
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				// The client
+				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
+
+				// Get the leader ref
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				int numSlots = 0;
+				while (numSlots == 0) {
+					Future<?> slotsFuture = leader.ask(JobManagerMessages
+							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
+
+					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
+				}
+
+				// Submit the job in non-detached mode
+				leader.tell(new SubmitJob(jobGraph,
+						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+			}
+
+			// Who's the boss?
+			JobManagerProcess leadingJobManagerProcess;
+			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
+				leadingJobManagerProcess = jobManagerProcess[0];
+			}
+			else {
+				leadingJobManagerProcess = jobManagerProcess[1];
+			}
+
+			// Kill the leading job manager process
+			leadingJobManagerProcess.destroy();
+
+			{
+				// Recovery by the standby JobManager
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+
+				// Cancel the job
+				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+			}
+
+			// Wait for the execution result
+			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
+
+			int jobSubmitSuccessMessages = 0;
+			for (Object msg : clientRef.underlyingActor().getMessages()) {
+				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
+					jobSubmitSuccessMessages++;
+				}
+			}
+
+			// At least two submissions should be ack-ed (initial and recovery). This is quite
+			// conservative, but it is still possible that these messages are overtaken by the
+			// final message.
+			assertEquals(2, jobSubmitSuccessMessages);
+		}
+		catch (Throwable t) {
+			// Print early (in some situations the process logs get too big
+			// for Travis and the root problem is not shown)
+			t.printStackTrace();
+
+			// In case of an error, print the job manager process logs.
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].printProcessLog();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].printProcessLog();
+			}
+
+			throw t;
+		}
+		finally {
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].destroy();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].destroy();
+			}
+
+			if (leaderRetrievalService != null) {
+				leaderRetrievalService.stop();
+			}
+
+			if (taskManagerSystem != null) {
+				taskManagerSystem.shutdown();
+			}
+
+			if (testSystem != null) {
+				testSystem.shutdown();
+			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
+		}
+	}
+
+	/**
+	 * Simple recording client.
+	 */
+	private static class RecordingTestClient extends UntypedActor {
+
+		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
+
+		private CountDownLatch jobResultLatch = new CountDownLatch(1);
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof LeaderSessionMessage) {
+				message = ((LeaderSessionMessage) message).message();
+			}
+
+			messages.add(message);
+
+			// Check for job result
+			if (message instanceof JobManagerMessages.JobResultFailure ||
+					message instanceof JobManagerMessages.JobResultSuccess) {
+
+				jobResultLatch.countDown();
+			}
+		}
+
+		public Queue<Object> getMessages() {
+			return messages;
+		}
+
+		public void awaitJobResult(long timeout) throws InterruptedException {
+			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a simple blocking JobGraph.
+	 */
+	private static JobGraph createBlockingJobGraph() {
+		JobGraph jobGraph = new JobGraph("Blocking program");
+
+		JobVertex jobVertex = new JobVertex("Blocking Vertex");
+		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+		jobGraph.addVertex(jobVertex);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
+	 */
+	private void verifyCleanRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (!stateHandles.isEmpty()) {
+			fail("File state backend is not clean: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+					"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() != 0) {
+			// Is everything clean again?
+			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
+					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
+	 */
+	private void verifyRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (stateHandles.isEmpty()) {
+			fail("File state backend has been cleaned: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+				"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() == 0) {
+			// Children have been cleaned up?
+			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
+				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
deleted file mode 100644
index 61897d1..0000000
--- a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-shaded-curator</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-shaded-curator-recipes</artifactId>
-	<name>flink-shaded-curator-recipes</name>
-
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet combine.self="override">
-								<includes>
-									<include>com.google.guava:*</include>
-									<include>org.apache.curator:*</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.curator.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/flink-shaded-curator-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-test/pom.xml b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
deleted file mode 100644
index 2a18162..0000000
--- a/flink-shaded-curator/flink-shaded-curator-test/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-shaded-curator</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-shaded-curator-test</artifactId>
-	<name>flink-shaded-curator-test</name>
-
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-
-		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet combine.self="override">
-								<excludes>
-									<exclude>log4j</exclude>
-									<exclude>org.slf4j:slf4j-log4j12</exclude>
-								</excludes>
-								<includes combine.children="append">
-									<include>org.apache.curator:curator-test</include>
-									<include>com.google.guava:guava</include>
-								</includes>
-							</artifactSet>
-							<relocations combine.children="append">
-								<relocation>
-									<pattern>org.apache.curator</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.curator.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
deleted file mode 100644
index d08320d..0000000
--- a/flink-shaded-curator/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<modules>
-		<module>flink-shaded-curator-recipes</module>
-		<module>flink-shaded-curator-test</module>
-	</modules>
-
-	<artifactId>flink-shaded-curator</artifactId>
-	<name>flink-shaded-curator</name>
-
-	<packaging>pom</packaging>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 4e50a9d..8e10a2e 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -200,9 +200,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator-test</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
deleted file mode 100644
index ee37d6d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.leaderelection.TestingListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests recovery of {@link SubmittedJobGraph} instances.
- */
-public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
-
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
-	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		ZooKeeper.shutdown();
-	}
-
-	@Before
-	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
-	 */
-	@Test
-	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Configure the cluster
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-		TestingCluster flink = new TestingCluster(config, false, false);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Start the JobManager and TaskManager
-			flink.start(true);
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			// Set restart strategy to guard against shut down races.
-			// If the TM fails before the JM, it might happen that the
-			// Job is failed, leading to state removal.
-			ExecutionConfig ec = new ExecutionConfig();
-			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
-			jobGraph.setExecutionConfig(ec);
-
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-
-			// Submit the job
-			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-			// Wait for the job to start
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					jobManager, deadline.timeLeft());
-		}
-		finally {
-			flink.stop();
-		}
-
-		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
-		// been shutdown
-		verifyRecoveryState(config);
-	}
-
-	/**
-	 * Tests that clients receive updates after recovery by a new leader.
-	 */
-	@Test
-	public void testClientNonDetachedListeningBehaviour() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Test actor system
-		ActorSystem testSystem = null;
-
-		// JobManager setup. Start the job managers as separate processes in order to not run the
-		// actors postStop, which cleans up all running jobs.
-		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-
-		LeaderRetrievalService leaderRetrievalService = null;
-
-		ActorSystem taskManagerSystem = null;
-
-		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Test actor system
-			testSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
-			// The job managers
-			jobManagerProcess[0] = new JobManagerProcess(0, config);
-			jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-			jobManagerProcess[0].startProcess();
-			jobManagerProcess[1].startProcess();
-
-			// Leader listener
-			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-			leaderRetrievalService.start(leaderListener);
-
-			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-			TaskManager.startTaskManagerComponentsAndActor(
-				config,
-				ResourceID.generate(),
-				taskManagerSystem,
-				highAvailabilityServices,
-				new NoOpMetricRegistry(),
-				"localhost",
-				Option.<String>empty(),
-				false,
-				TaskManager.class);
-
-			// Client test actor
-			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
-					testSystem, Props.create(RecordingTestClient.class));
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			{
-				// Initial submission
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				// The client
-				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
-
-				// Get the leader ref
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				int numSlots = 0;
-				while (numSlots == 0) {
-					Future<?> slotsFuture = leader.ask(JobManagerMessages
-							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
-
-					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
-				}
-
-				// Submit the job in non-detached mode
-				leader.tell(new SubmitJob(jobGraph,
-						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-			}
-
-			// Who's the boss?
-			JobManagerProcess leadingJobManagerProcess;
-			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
-				leadingJobManagerProcess = jobManagerProcess[0];
-			}
-			else {
-				leadingJobManagerProcess = jobManagerProcess[1];
-			}
-
-			// Kill the leading job manager process
-			leadingJobManagerProcess.destroy();
-
-			{
-				// Recovery by the standby JobManager
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-
-				// Cancel the job
-				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
-			}
-
-			// Wait for the execution result
-			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
-
-			int jobSubmitSuccessMessages = 0;
-			for (Object msg : clientRef.underlyingActor().getMessages()) {
-				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
-					jobSubmitSuccessMessages++;
-				}
-			}
-
-			// At least two submissions should be ack-ed (initial and recovery). This is quite
-			// conservative, but it is still possible that these messages are overtaken by the
-			// final message.
-			assertEquals(2, jobSubmitSuccessMessages);
-		}
-		catch (Throwable t) {
-			// Print early (in some situations the process logs get too big
-			// for Travis and the root problem is not shown)
-			t.printStackTrace();
-
-			// In case of an error, print the job manager process logs.
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].printProcessLog();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].printProcessLog();
-			}
-
-			throw t;
-		}
-		finally {
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].destroy();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].destroy();
-			}
-
-			if (leaderRetrievalService != null) {
-				leaderRetrievalService.stop();
-			}
-
-			if (taskManagerSystem != null) {
-				taskManagerSystem.shutdown();
-			}
-
-			if (testSystem != null) {
-				testSystem.shutdown();
-			}
-
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
-
-	/**
-	 * Simple recording client.
-	 */
-	private static class RecordingTestClient extends UntypedActor {
-
-		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
-
-		private CountDownLatch jobResultLatch = new CountDownLatch(1);
-
-		@Override
-		public void onReceive(Object message) throws Exception {
-			if (message instanceof LeaderSessionMessage) {
-				message = ((LeaderSessionMessage) message).message();
-			}
-
-			messages.add(message);
-
-			// Check for job result
-			if (message instanceof JobManagerMessages.JobResultFailure ||
-					message instanceof JobManagerMessages.JobResultSuccess) {
-
-				jobResultLatch.countDown();
-			}
-		}
-
-		public Queue<Object> getMessages() {
-			return messages;
-		}
-
-		public void awaitJobResult(long timeout) throws InterruptedException {
-			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
-		}
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a simple blocking JobGraph.
-	 */
-	private static JobGraph createBlockingJobGraph() {
-		JobGraph jobGraph = new JobGraph("Blocking program");
-
-		JobVertex jobVertex = new JobVertex("Blocking Vertex");
-		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-		jobGraph.addVertex(jobVertex);
-
-		return jobGraph;
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
-	 */
-	private void verifyCleanRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (!stateHandles.isEmpty()) {
-			fail("File state backend is not clean: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-					"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() != 0) {
-			// Is everything clean again?
-			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
-					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
-	 */
-	private void verifyRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (stateHandles.isEmpty()) {
-			fail("File state backend has been cleaned: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-				"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() == 0) {
-			// Children have been cleaned up?
-			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
-				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9cf603a..a5f99f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,6 @@ under the License.
 		<module>tools/force-shading</module>
 		<module>flink-annotations</module>
 		<module>flink-shaded-hadoop</module>
-		<module>flink-shaded-curator</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-java8</module>