You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/02 15:51:54 UTC

[1/3] flink git commit: [FLINK-7778] [build] Shade ZooKeeper dependency (followups)

Repository: flink
Updated Branches:
  refs/heads/master 786a6cbb3 -> 8afadd459


[FLINK-7778] [build] Shade ZooKeeper dependency (followups)

  - Rename the 'flink-shaded-curator-recipes' module to 'flink-shaded-curator',
    because it actually contains more curator code than just the recipes.

  - Move the exception handling logic of 'ZooKeeperAccess' directly into the
    ZooKeeperStateHandleStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8afadd45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8afadd45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8afadd45

Branch: refs/heads/master
Commit: 8afadd459294bad8c8eecd1b0c75f773bfe27704
Parents: d368a07
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 2 14:47:39 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 2 16:51:25 2017 +0100

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        | 18 ++--
 .../zookeeper/ZooKeeperStateHandleStore.java    | 10 ++-
 .../runtime/zookeeper/ZookeeperAccess.java      | 66 ---------------
 flink-shaded-curator-recipes/pom.xml            | 88 --------------------
 flink-shaded-curator/pom.xml                    | 88 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 6 files changed, 102 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 738f99e..9f2fa44 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -25,10 +25,10 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
-import org.apache.flink.runtime.zookeeper.ZookeeperAccess;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.mesos.Protos;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -214,19 +214,11 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 			int currentVersion = workersInZooKeeper.exists(path);
 			if (currentVersion == -1) {
-				try {
-					workersInZooKeeper.addAndLock(path, worker);
-					LOG.debug("Added {} in ZooKeeper.", worker);
-				} catch (Exception ex) {
-					throw ZookeeperAccess.wrapIfZooKeeperNodeExistsException(ex);
-				}
+				workersInZooKeeper.addAndLock(path, worker);
+				LOG.debug("Added {} in ZooKeeper.", worker);
 			} else {
-				try {
-					workersInZooKeeper.replace(path, currentVersion, worker);
-					LOG.debug("Updated {} in ZooKeeper.", worker);
-				} catch (Exception ex) {
-					throw ZookeeperAccess.wrapIfZooKeeperNoNodeException(ex);
-				}
+				workersInZooKeeper.replace(path, currentVersion, worker);
+				LOG.debug("Updated {} in ZooKeeper.", worker);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dc3f7d7..f0d67fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -41,6 +41,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Executor;
@@ -163,6 +164,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			success = true;
 			return storeHandle;
 		}
+		catch (KeeperException.NodeExistsException e) {
+			throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
+		}
 		finally {
 			if (!success) {
 				// Cleanup the state handle if it was not written to ZooKeeper.
@@ -202,8 +206,10 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					.withVersion(expectedVersion)
 					.forPath(path, serializedStateHandle);
 			success = true;
+		} catch (KeeperException.NoNodeException e) {
+			throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
 		} finally {
-			if(success) {
+			if (success) {
 				oldStateHandle.discardState();
 			} else {
 				newStateHandle.discardState();
@@ -673,7 +679,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			}
 
 		}
-	};
+	}
 
 	/**
 	 * Callback interface for remove calls

http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
deleted file mode 100644
index 2c6160a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.zookeeper.KeeperException;
-
-import java.util.ConcurrentModificationException;
-
-/**
- * Utility class providing access to relocated zookeeper classes.
- *
- * <p>This class is necessary as flink-runtime relocates its ZooKeeper dependency.
- * Other modules may still depend on this dependency but will encounter a ClassNotFoundException
- * on access as they don't apply the relocation pattern of flink-runtime.
- */
-public final class ZookeeperAccess {
-	
-	private ZookeeperAccess(){
-	}
-
-	/**
-	 * Wraps and returns the given exception in a {@link ConcurrentModificationException} if it is a
-	 * {@link org.apache.zookeeper.KeeperException.NodeExistsException}. Otherwise the
-	 * given exception is returned.
-	 *
-	 * @param ex exception to wrap
-	 * @return wrapping ConcurrentModificationException if it is a NodeExistsException, otherwise the given exception
-	 */
-	public static Exception wrapIfZooKeeperNodeExistsException(Exception ex) {
-		if (ex instanceof KeeperException.NodeExistsException) {
-			return new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
-		}
-		return ex;
-	}
-
-	/**
-	 * Wraps and returns the given exception in a {@link ConcurrentModificationException} if it is a
-	 * {@link org.apache.zookeeper.KeeperException.NoNodeException}. Otherwise the
-	 * given exception is returned.
-	 *
-	 * @param ex exception to wrap
-	 * @return wrapping ConcurrentModificationException if it is a NoNodeException, otherwise the given exception
-	 */
-	public static Exception wrapIfZooKeeperNoNodeException(Exception ex) {
-		if (ex instanceof KeeperException.NoNodeException) {
-			return new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
-		}
-		return ex;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator-recipes/pom.xml
deleted file mode 100644
index 06d3821..0000000
--- a/flink-shaded-curator-recipes/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-shaded-curator-recipes</artifactId>
-	<name>flink-shaded-curator-recipes</name>
-
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet>
-								<includes>
-									<include>com.google.guava:guava</include>
-									<include>org.apache.curator:*</include>
-								</includes>
-							</artifactSet>
-							<filters>
-								<filter>
-									<artifact>com.google.guava:guava</artifact>
-									<!-- Shade guava classes that are not included by curator -->
-									<includes>
-										<include>com/google/common/base/Function.class</include>
-										<include>com/google/common/base/Predicate.class</include>
-										<include>com/google/common/reflect/TypeToken.class</include>
-									</includes>
-								</filter>
-							</filters>
-							<relocations>
-								<relocation>
-									<pattern>com.google.common</pattern>
-									<shadedPattern>org.apache.flink.curator.shaded.com.google.common</shadedPattern>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/flink-shaded-curator/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
new file mode 100644
index 0000000..3c2e80f
--- /dev/null
+++ b/flink-shaded-curator/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-shaded-curator</artifactId>
+	<name>flink-shaded-curator</name>
+
+	<packaging>jar</packaging>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>com.google.guava:guava</include>
+									<include>org.apache.curator:*</include>
+								</includes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>com.google.guava:guava</artifact>
+									<!-- Shade guava classes that are not included by curator -->
+									<includes>
+										<include>com/google/common/base/Function.class</include>
+										<include>com/google/common/base/Predicate.class</include>
+										<include>com/google/common/reflect/TypeToken.class</include>
+									</includes>
+								</filter>
+							</filters>
+							<relocations>
+								<relocation>
+									<pattern>com.google.common</pattern>
+									<shadedPattern>org.apache.flink.curator.shaded.com.google.common</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8afadd45/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index faddab4..9cf603a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@ under the License.
 		<module>tools/force-shading</module>
 		<module>flink-annotations</module>
 		<module>flink-shaded-hadoop</module>
-		<module>flink-shaded-curator-recipes</module>
+		<module>flink-shaded-curator</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-java8</module>


[3/3] flink git commit: [FLINK-7778] [build] Shade ZooKeeper dependency (part 1)

Posted by se...@apache.org.
[FLINK-7778] [build] Shade ZooKeeper dependency (part 1)

Shading the ZooKeeper dependency makes sure that this specific version of
ZooKeeper is used by the Flink runtime module. The ZooKeeper version is
sensitive, because we depend on bug fixes in later ZooKeeper versions
for Flink's high availability.

This prevents situations where for example a set of added dependencies (for
example transtive dependencies of Hadoop) cause a different ZooKeeper version
to be in the classpath and be loaded.

This commit also removes the 'flink-shaded-curator' module, which was originally
created to shade guava within curator, but is now obsolete, because newer
versions of curator shade guava already.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d028236
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d028236
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d028236

Branch: refs/heads/master
Commit: 4d0282364f2e70fce0b55e218edccabd8e079fd8
Parents: 786a6cb
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 13 22:14:09 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 2 16:51:25 2017 +0100

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  38 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     | 447 ++++++++++++++++++
 .../flink-shaded-curator-recipes/pom.xml        |  81 ----
 .../flink-shaded-curator-test/pom.xml           |  95 ----
 flink-shaded-curator/pom.xml                    |  41 --
 flink-tests/pom.xml                             |   6 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     | 448 -------------------
 pom.xml                                         |   1 -
 8 files changed, 480 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a450f1c..46990a9 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -152,11 +152,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.zookeeper</groupId>
-			<artifactId>zookeeper</artifactId>
-		</dependency>
-
-		<dependency>
 			<groupId>org.xerial.snappy</groupId>
 			<artifactId>snappy-java</artifactId>
 			<version>1.1.4</version>
@@ -174,10 +169,26 @@ under the License.
 			<version>${chill.version}</version>
 		</dependency>
 
+		<!-- Curator and ZooKeeper - we explicitly add ZooKeeper here as
+			well to make sure our managed version is used -->
+
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator-recipes</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+			<version>${curator.version}</version>
+			<exclusions>
+				<!-- curator shades guava, but still has a dependency on it. -->
+				<!-- We can safely exclude Guava here -->
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -437,6 +448,8 @@ under the License.
 									<include>com.typesafe.akka:akka-remote_*</include>
 									<include>io.netty:netty</include>
 									<include>org.uncommons.maths:uncommons-maths</include>
+									<include>org.apache.curator:*</include>
+									<include>org.apache.zookeeper:*</include>
 								</includes>
 							</artifactSet>
 							<relocations combine.children="append">
@@ -452,6 +465,15 @@ under the License.
 									<pattern>org.apache.curator</pattern>
 									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>org.apache.zookeeper</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper</shadedPattern>
+								</relocation>
+								<!-- jute is already shaded into the ZooKeeper jar -->
+								<relocation>
+									<pattern>org.apache.jute</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper.jute</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
new file mode 100644
index 0000000..f5d6802
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests recovery of {@link SubmittedJobGraph} instances.
+ */
+public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
+
+	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		ZooKeeper.shutdown();
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
+	 */
+	@Test
+	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
+
+		// Configure the cluster
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		TestingCluster flink = new TestingCluster(config, false, false);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Start the JobManager and TaskManager
+			flink.start(true);
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			// Set restart strategy to guard against shut down races.
+			// If the TM fails before the JM, it might happen that the
+			// Job is failed, leading to state removal.
+			ExecutionConfig ec = new ExecutionConfig();
+			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
+			jobGraph.setExecutionConfig(ec);
+
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+			// Submit the job
+			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+			// Wait for the job to start
+			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+					jobManager, deadline.timeLeft());
+		}
+		finally {
+			flink.stop();
+		}
+
+		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
+		// been shutdown
+		verifyRecoveryState(config);
+	}
+
+	/**
+	 * Tests that clients receive updates after recovery by a new leader.
+	 */
+	@Test
+	public void testClientNonDetachedListeningBehaviour() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
+
+		// Test actor system
+		ActorSystem testSystem = null;
+
+		// JobManager setup. Start the job managers as separate processes in order to not run the
+		// actors postStop, which cleans up all running jobs.
+		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+
+		LeaderRetrievalService leaderRetrievalService = null;
+
+		ActorSystem taskManagerSystem = null;
+
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Test actor system
+			testSystem = AkkaUtils.createActorSystem(new Configuration(),
+					new Some<>(new Tuple2<String, Object>("localhost", 0)));
+
+			// The job managers
+			jobManagerProcess[0] = new JobManagerProcess(0, config);
+			jobManagerProcess[1] = new JobManagerProcess(1, config);
+
+			jobManagerProcess[0].startProcess();
+			jobManagerProcess[1].startProcess();
+
+			// Leader listener
+			TestingListener leaderListener = new TestingListener();
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService.start(leaderListener);
+
+			// The task manager
+			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+			TaskManager.startTaskManagerComponentsAndActor(
+				config,
+				ResourceID.generate(),
+				taskManagerSystem,
+				highAvailabilityServices,
+				new NoOpMetricRegistry(),
+				"localhost",
+				Option.<String>empty(),
+				false,
+				TaskManager.class);
+
+			// Client test actor
+			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
+					testSystem, Props.create(RecordingTestClient.class));
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			{
+				// Initial submission
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				// The client
+				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
+
+				// Get the leader ref
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				int numSlots = 0;
+				while (numSlots == 0) {
+					Future<?> slotsFuture = leader.ask(JobManagerMessages
+							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
+
+					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
+				}
+
+				// Submit the job in non-detached mode
+				leader.tell(new SubmitJob(jobGraph,
+						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+			}
+
+			// Who's the boss?
+			JobManagerProcess leadingJobManagerProcess;
+			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
+				leadingJobManagerProcess = jobManagerProcess[0];
+			}
+			else {
+				leadingJobManagerProcess = jobManagerProcess[1];
+			}
+
+			// Kill the leading job manager process
+			leadingJobManagerProcess.destroy();
+
+			{
+				// Recovery by the standby JobManager
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+
+				// Cancel the job
+				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+			}
+
+			// Wait for the execution result
+			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
+
+			int jobSubmitSuccessMessages = 0;
+			for (Object msg : clientRef.underlyingActor().getMessages()) {
+				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
+					jobSubmitSuccessMessages++;
+				}
+			}
+
+			// At least two submissions should be ack-ed (initial and recovery). This is quite
+			// conservative, but it is still possible that these messages are overtaken by the
+			// final message.
+			assertEquals(2, jobSubmitSuccessMessages);
+		}
+		catch (Throwable t) {
+			// Print early (in some situations the process logs get too big
+			// for Travis and the root problem is not shown)
+			t.printStackTrace();
+
+			// In case of an error, print the job manager process logs.
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].printProcessLog();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].printProcessLog();
+			}
+
+			throw t;
+		}
+		finally {
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].destroy();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].destroy();
+			}
+
+			if (leaderRetrievalService != null) {
+				leaderRetrievalService.stop();
+			}
+
+			if (taskManagerSystem != null) {
+				taskManagerSystem.shutdown();
+			}
+
+			if (testSystem != null) {
+				testSystem.shutdown();
+			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
+		}
+	}
+
+	/**
+	 * Simple recording client.
+	 */
+	private static class RecordingTestClient extends UntypedActor {
+
+		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
+
+		private CountDownLatch jobResultLatch = new CountDownLatch(1);
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof LeaderSessionMessage) {
+				message = ((LeaderSessionMessage) message).message();
+			}
+
+			messages.add(message);
+
+			// Check for job result
+			if (message instanceof JobManagerMessages.JobResultFailure ||
+					message instanceof JobManagerMessages.JobResultSuccess) {
+
+				jobResultLatch.countDown();
+			}
+		}
+
+		public Queue<Object> getMessages() {
+			return messages;
+		}
+
+		public void awaitJobResult(long timeout) throws InterruptedException {
+			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a simple blocking JobGraph.
+	 */
+	private static JobGraph createBlockingJobGraph() {
+		JobGraph jobGraph = new JobGraph("Blocking program");
+
+		JobVertex jobVertex = new JobVertex("Blocking Vertex");
+		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+		jobGraph.addVertex(jobVertex);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
+	 */
+	private void verifyCleanRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (!stateHandles.isEmpty()) {
+			fail("File state backend is not clean: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+					"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() != 0) {
+			// Is everything clean again?
+			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
+					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
+	 */
+	private void verifyRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (stateHandles.isEmpty()) {
+			fail("File state backend has been cleaned: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+				"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() == 0) {
+			// Children have been cleaned up?
+			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
+				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
deleted file mode 100644
index 61897d1..0000000
--- a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-shaded-curator</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-shaded-curator-recipes</artifactId>
-	<name>flink-shaded-curator-recipes</name>
-
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet combine.self="override">
-								<includes>
-									<include>com.google.guava:*</include>
-									<include>org.apache.curator:*</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.curator.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/flink-shaded-curator-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-test/pom.xml b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
deleted file mode 100644
index 2a18162..0000000
--- a/flink-shaded-curator/flink-shaded-curator-test/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-shaded-curator</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-shaded-curator-test</artifactId>
-	<name>flink-shaded-curator-test</name>
-
-	<packaging>jar</packaging>
-
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-		</dependency>
-
-		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet combine.self="override">
-								<excludes>
-									<exclude>log4j</exclude>
-									<exclude>org.slf4j:slf4j-log4j12</exclude>
-								</excludes>
-								<includes combine.children="append">
-									<include>org.apache.curator:curator-test</include>
-									<include>com.google.guava:guava</include>
-								</includes>
-							</artifactSet>
-							<relocations combine.children="append">
-								<relocation>
-									<pattern>org.apache.curator</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.curator.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-shaded-curator/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
deleted file mode 100644
index d08320d..0000000
--- a/flink-shaded-curator/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<modules>
-		<module>flink-shaded-curator-recipes</module>
-		<module>flink-shaded-curator-test</module>
-	</modules>
-
-	<artifactId>flink-shaded-curator</artifactId>
-	<name>flink-shaded-curator</name>
-
-	<packaging>pom</packaging>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 4e50a9d..8e10a2e 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -200,9 +200,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator-test</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
deleted file mode 100644
index ee37d6d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.leaderelection.TestingListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests recovery of {@link SubmittedJobGraph} instances.
- */
-public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
-
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
-	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		ZooKeeper.shutdown();
-	}
-
-	@Before
-	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
-	 */
-	@Test
-	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Configure the cluster
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-		TestingCluster flink = new TestingCluster(config, false, false);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Start the JobManager and TaskManager
-			flink.start(true);
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			// Set restart strategy to guard against shut down races.
-			// If the TM fails before the JM, it might happen that the
-			// Job is failed, leading to state removal.
-			ExecutionConfig ec = new ExecutionConfig();
-			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
-			jobGraph.setExecutionConfig(ec);
-
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-
-			// Submit the job
-			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-			// Wait for the job to start
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					jobManager, deadline.timeLeft());
-		}
-		finally {
-			flink.stop();
-		}
-
-		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
-		// been shutdown
-		verifyRecoveryState(config);
-	}
-
-	/**
-	 * Tests that clients receive updates after recovery by a new leader.
-	 */
-	@Test
-	public void testClientNonDetachedListeningBehaviour() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Test actor system
-		ActorSystem testSystem = null;
-
-		// JobManager setup. Start the job managers as separate processes in order to not run the
-		// actors postStop, which cleans up all running jobs.
-		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-
-		LeaderRetrievalService leaderRetrievalService = null;
-
-		ActorSystem taskManagerSystem = null;
-
-		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Test actor system
-			testSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
-			// The job managers
-			jobManagerProcess[0] = new JobManagerProcess(0, config);
-			jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-			jobManagerProcess[0].startProcess();
-			jobManagerProcess[1].startProcess();
-
-			// Leader listener
-			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-			leaderRetrievalService.start(leaderListener);
-
-			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-			TaskManager.startTaskManagerComponentsAndActor(
-				config,
-				ResourceID.generate(),
-				taskManagerSystem,
-				highAvailabilityServices,
-				new NoOpMetricRegistry(),
-				"localhost",
-				Option.<String>empty(),
-				false,
-				TaskManager.class);
-
-			// Client test actor
-			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
-					testSystem, Props.create(RecordingTestClient.class));
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			{
-				// Initial submission
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				// The client
-				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
-
-				// Get the leader ref
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				int numSlots = 0;
-				while (numSlots == 0) {
-					Future<?> slotsFuture = leader.ask(JobManagerMessages
-							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
-
-					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
-				}
-
-				// Submit the job in non-detached mode
-				leader.tell(new SubmitJob(jobGraph,
-						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-			}
-
-			// Who's the boss?
-			JobManagerProcess leadingJobManagerProcess;
-			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
-				leadingJobManagerProcess = jobManagerProcess[0];
-			}
-			else {
-				leadingJobManagerProcess = jobManagerProcess[1];
-			}
-
-			// Kill the leading job manager process
-			leadingJobManagerProcess.destroy();
-
-			{
-				// Recovery by the standby JobManager
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-
-				// Cancel the job
-				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
-			}
-
-			// Wait for the execution result
-			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
-
-			int jobSubmitSuccessMessages = 0;
-			for (Object msg : clientRef.underlyingActor().getMessages()) {
-				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
-					jobSubmitSuccessMessages++;
-				}
-			}
-
-			// At least two submissions should be ack-ed (initial and recovery). This is quite
-			// conservative, but it is still possible that these messages are overtaken by the
-			// final message.
-			assertEquals(2, jobSubmitSuccessMessages);
-		}
-		catch (Throwable t) {
-			// Print early (in some situations the process logs get too big
-			// for Travis and the root problem is not shown)
-			t.printStackTrace();
-
-			// In case of an error, print the job manager process logs.
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].printProcessLog();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].printProcessLog();
-			}
-
-			throw t;
-		}
-		finally {
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].destroy();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].destroy();
-			}
-
-			if (leaderRetrievalService != null) {
-				leaderRetrievalService.stop();
-			}
-
-			if (taskManagerSystem != null) {
-				taskManagerSystem.shutdown();
-			}
-
-			if (testSystem != null) {
-				testSystem.shutdown();
-			}
-
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
-
-	/**
-	 * Simple recording client.
-	 */
-	private static class RecordingTestClient extends UntypedActor {
-
-		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
-
-		private CountDownLatch jobResultLatch = new CountDownLatch(1);
-
-		@Override
-		public void onReceive(Object message) throws Exception {
-			if (message instanceof LeaderSessionMessage) {
-				message = ((LeaderSessionMessage) message).message();
-			}
-
-			messages.add(message);
-
-			// Check for job result
-			if (message instanceof JobManagerMessages.JobResultFailure ||
-					message instanceof JobManagerMessages.JobResultSuccess) {
-
-				jobResultLatch.countDown();
-			}
-		}
-
-		public Queue<Object> getMessages() {
-			return messages;
-		}
-
-		public void awaitJobResult(long timeout) throws InterruptedException {
-			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
-		}
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a simple blocking JobGraph.
-	 */
-	private static JobGraph createBlockingJobGraph() {
-		JobGraph jobGraph = new JobGraph("Blocking program");
-
-		JobVertex jobVertex = new JobVertex("Blocking Vertex");
-		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-		jobGraph.addVertex(jobVertex);
-
-		return jobGraph;
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
-	 */
-	private void verifyCleanRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (!stateHandles.isEmpty()) {
-			fail("File state backend is not clean: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-					"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() != 0) {
-			// Is everything clean again?
-			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
-					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
-	 */
-	private void verifyRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (stateHandles.isEmpty()) {
-			fail("File state backend has been cleaned: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-				"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() == 0) {
-			// Children have been cleaned up?
-			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
-				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d028236/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9cf603a..a5f99f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,6 @@ under the License.
 		<module>tools/force-shading</module>
 		<module>flink-annotations</module>
 		<module>flink-shaded-hadoop</module>
-		<module>flink-shaded-curator</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-java8</module>


[2/3] flink git commit: [FLINK-7778] [build] Shade ZooKeeper dependency (part 2)

Posted by se...@apache.org.
[FLINK-7778] [build] Shade ZooKeeper dependency (part 2)

This closes #4927


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d368a07a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d368a07a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d368a07a

Branch: refs/heads/master
Commit: d368a07a2146c45a7f994cab0bb55524432a843d
Parents: 4d02823
Author: zentol <ch...@apache.org>
Authored: Tue Oct 31 11:26:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 2 16:51:25 2017 +0100

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        | 11 ++-
 flink-runtime/pom.xml                           | 27 +++---
 .../runtime/zookeeper/ZookeeperAccess.java      | 66 +++++++++++++++
 flink-shaded-curator-recipes/pom.xml            | 88 ++++++++++++++++++++
 flink-shaded-hadoop/pom.xml                     |  5 --
 pom.xml                                         |  1 +
 tools/travis_mvn_watchdog.sh                    | 16 ++++
 7 files changed, 188 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 92e4416..738f99e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -25,17 +25,16 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
+import org.apache.flink.runtime.zookeeper.ZookeeperAccess;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.mesos.Protos;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.ConcurrentModificationException;
 import java.util.List;
 
 import scala.Option;
@@ -218,15 +217,15 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				try {
 					workersInZooKeeper.addAndLock(path, worker);
 					LOG.debug("Added {} in ZooKeeper.", worker);
-				} catch (KeeperException.NodeExistsException ex) {
-					throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
+				} catch (Exception ex) {
+					throw ZookeeperAccess.wrapIfZooKeeperNodeExistsException(ex);
 				}
 			} else {
 				try {
 					workersInZooKeeper.replace(path, currentVersion, worker);
 					LOG.debug("Updated {} in ZooKeeper.", worker);
-				} catch (KeeperException.NoNodeException ex) {
-					throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
+				} catch (Exception ex) {
+					throw ZookeeperAccess.wrapIfZooKeeperNoNodeException(ex);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 46990a9..481de6d 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -178,17 +178,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-			<exclusions>
-				<!-- curator shades guava, but still has a dependency on it. -->
-				<!-- We can safely exclude Guava here -->
-				<exclusion>
-					<groupId>com.google.guava</groupId>
-					<artifactId>guava</artifactId>
-				</exclusion>
-			</exclusions>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -448,7 +440,6 @@ under the License.
 									<include>com.typesafe.akka:akka-remote_*</include>
 									<include>io.netty:netty</include>
 									<include>org.uncommons.maths:uncommons-maths</include>
-									<include>org.apache.curator:*</include>
 									<include>org.apache.zookeeper:*</include>
 								</includes>
 							</artifactSet>
@@ -463,16 +454,22 @@ under the License.
 								</relocation>
 								<relocation>
 									<pattern>org.apache.curator</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+									<shadedPattern>org.apache.flink.shaded.curator.org.apache.curator</shadedPattern>
+									<excludes>
+										<!-- Do not relocate curator-test. This leads to problems for downstream
+											users of runtime test classes that make use of it as the relocated
+											dependency is not included in the test-jar.-->
+										<exclude>org.apache.curator.test.*</exclude>
+									</excludes>
 								</relocation>
 								<relocation>
 									<pattern>org.apache.zookeeper</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper</shadedPattern>
+									<shadedPattern>org.apache.flink.shaded.zookeeper.org.apache.zookeeper</shadedPattern>
 								</relocation>
 								<!-- jute is already shaded into the ZooKeeper jar -->
 								<relocation>
 									<pattern>org.apache.jute</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.apache.zookeeper.jute</shadedPattern>
+									<shadedPattern>org.apache.flink.shaded.zookeeper.org.apache.zookeeper.jute</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
new file mode 100644
index 0000000..2c6160a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZookeeperAccess.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * Utility class providing access to relocated zookeeper classes.
+ *
+ * <p>This class is necessary as flink-runtime relocates its ZooKeeper dependency.
+ * Other modules may still depend on this dependency but will encounter a ClassNotFoundException
+ * on access as they don't apply the relocation pattern of flink-runtime.
+ */
+public final class ZookeeperAccess {
+	
+	private ZookeeperAccess(){
+	}
+
+	/**
+	 * Wraps and returns the given exception in a {@link ConcurrentModificationException} if it is a
+	 * {@link org.apache.zookeeper.KeeperException.NodeExistsException}. Otherwise the
+	 * given exception is returned.
+	 *
+	 * @param ex exception to wrap
+	 * @return wrapping ConcurrentModificationException if it is a NodeExistsException, otherwise the given exception
+	 */
+	public static Exception wrapIfZooKeeperNodeExistsException(Exception ex) {
+		if (ex instanceof KeeperException.NodeExistsException) {
+			return new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
+		}
+		return ex;
+	}
+
+	/**
+	 * Wraps and returns the given exception in a {@link ConcurrentModificationException} if it is a
+	 * {@link org.apache.zookeeper.KeeperException.NoNodeException}. Otherwise the
+	 * given exception is returned.
+	 *
+	 * @param ex exception to wrap
+	 * @return wrapping ConcurrentModificationException if it is a NoNodeException, otherwise the given exception
+	 */
+	public static Exception wrapIfZooKeeperNoNodeException(Exception ex) {
+		if (ex instanceof KeeperException.NoNodeException) {
+			return new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
+		}
+		return ex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator-recipes/pom.xml
new file mode 100644
index 0000000..06d3821
--- /dev/null
+++ b/flink-shaded-curator-recipes/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-shaded-curator-recipes</artifactId>
+	<name>flink-shaded-curator-recipes</name>
+
+	<packaging>jar</packaging>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>com.google.guava:guava</include>
+									<include>org.apache.curator:*</include>
+								</includes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>com.google.guava:guava</artifact>
+									<!-- Shade guava classes that are not included by curator -->
+									<includes>
+										<include>com/google/common/base/Function.class</include>
+										<include>com/google/common/base/Predicate.class</include>
+										<include>com/google/common/reflect/TypeToken.class</include>
+									</includes>
+								</filter>
+							</filters>
+							<relocations>
+								<relocation>
+									<pattern>com.google.common</pattern>
+									<shadedPattern>org.apache.flink.curator.shaded.com.google.common</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/flink-shaded-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index f4b0816..ba90fc9 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -53,11 +53,6 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.zookeeper</groupId>
-			<artifactId>zookeeper</artifactId>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
 			<groupId>org.apache.avro</groupId>
 			<artifactId>avro</artifactId>
 			<scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a5f99f7..faddab4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
 		<module>tools/force-shading</module>
 		<module>flink-annotations</module>
 		<module>flink-shaded-hadoop</module>
+		<module>flink-shaded-curator-recipes</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-java8</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/d368a07a/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index b7e3709..978bc9f 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -325,6 +325,22 @@ check_shaded_artifacts() {
 		return 1
 	fi
 
+	ZOOKEEPER=`cat allClasses | grep '^org/apache/zookeeper' | wc -l`
+	if [ "$ZOOKEEPER" != "0" ]; then
+		echo "=============================================================================="
+		echo "Detected '$ZOOKEEPER' unshaded org.apache.zookeeper classes in fat jar"
+		echo "=============================================================================="
+		return 1
+	fi
+
+	CURATOR=`cat allClasses | grep '^org/apache/curator' | wc -l`
+	if [ "$CURATOR" != "0" ]; then
+		echo "=============================================================================="
+		echo "Detected '$CURATOR' unshaded org.apache.curator classes in fat jar"
+		echo "=============================================================================="
+		return 1
+	fi
+
 	FLINK_PYTHON=`cat allClasses | grep '^org/apache/flink/python' | wc -l`
 	if [ "$FLINK_PYTHON" != "0" ]; then
 		echo "=============================================================================="