You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/23 12:26:26 UTC

[1/3] flink git commit: [FLINK-5496] [mesos] Relocate Mesos Protobuf dependency to avoid version conflicts

Repository: flink
Updated Branches:
  refs/heads/release-1.2 099cdd010 -> 9fc1fe017


[FLINK-5496] [mesos] Relocate Mesos Protobuf dependency to avoid version conflicts

Only relocate Mesos Protobuf dependency in flink-mesos. This avoids problems with Mesos
because Flink pulls in Protobuf 2.5.0 via Flakka.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0252126
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0252126
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0252126

Branch: refs/heads/release-1.2
Commit: b02521262322098a7feb2a8bdb7442c21edeefd6
Parents: 404d425
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 16 14:01:10 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 10:24:24 2017 +0100

----------------------------------------------------------------------
 flink-mesos/pom.xml   | 17 ++++++-----------
 flink-runtime/pom.xml |  1 -
 2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0252126/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 69e0c84..d1f2701 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -255,7 +255,9 @@ under the License.
 				</configuration>
 			</plugin>
 
-			<!-- Relocate curator -->
+			<!-- Relocate Mesos Protobuf dependency. Mesos 1.0.1 requires Protobuf 2.6.1 whereas
+			 Flakka pulls in Protobuf 2.5.0. It might be feasible to set Protobuf to version 2.6.1,
+			 but we shade to be on the safe side. -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
@@ -270,21 +272,14 @@ under the License.
 							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
 							<artifactSet>
 								<includes combine.children="append">
-									<include>org.apache.flink:flink-shaded-curator-recipes</include>
 									<include>com.google.protobuf:*</include>
-									<include>com.google.guava:guava</include>
 									<include>org.apache.mesos:*</include>
-									<include>com.netflix.fenzo:*</include>
 								</includes>
 							</artifactSet>
-							<relocations combine.children="override">
+							<relocations combine.children="append">
 								<relocation>
-									<pattern>org.apache.curator</pattern>
-									<shadedPattern>org.apache.flink.mesos.shaded.org.apache.curator</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.mesos.shaded.com.google</shadedPattern>
+									<pattern>com.google.protobuf</pattern>
+									<shadedPattern>org.apache.flink.mesos.shaded.com.google.protobuf</shadedPattern>
 								</relocation>
 							</relocations>
 						</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/b0252126/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index ab1ff6b..bb87c23 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -374,7 +374,6 @@ under the License.
 							<goal>shade</goal>
 						</goals>
 						<configuration>
-						
 							<artifactSet>
 								<includes combine.children="append">
 									<include>org.apache.flink:flink-shaded-curator-recipes</include>


[2/3] flink git commit: [FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWorkerStore

Posted by tr...@apache.org.
[FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWorkerStore

The ZooKeeperMesosWorkerStore instantiates a ZooKeeperStateHandleStore which requires an
Executor instance. This executor is now given to the ZooKeeperMesosWorkerStore.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/404d4252
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/404d4252
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/404d4252

Branch: refs/heads/release-1.2
Commit: 404d425294184978979f79713727d871bf6516dd
Parents: 099cdd0
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 16 14:14:18 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 10:24:24 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  7 ++++---
 .../store/ZooKeeperMesosWorkerStore.java        | 21 ++++++++++++--------
 2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/404d4252/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 689c26a..c9b6eed 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -73,6 +73,7 @@ import java.net.URL;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -315,7 +316,7 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Mesos Flink Resource Manager");
 
 			// create the worker store to persist task information across restarts
-			MesosWorkerStore workerStore = createWorkerStore(config);
+			MesosWorkerStore workerStore = createWorkerStore(config, ioExecutor);
 
 			// we need the leader retrieval service here to be informed of new
 			// leader session IDs, even though there can be only one leader ever
@@ -497,7 +498,7 @@ public class MesosApplicationMasterRunner {
 		return mesos;
 	}
 
-	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception {
+	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig, Executor executor) throws Exception {
 		MesosWorkerStore workerStore;
 		HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig);
 		if (recoveryMode == HighAvailabilityMode.NONE) {
@@ -506,7 +507,7 @@ public class MesosApplicationMasterRunner {
 		else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
 			// note: the store is responsible for closing the client.
 			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
-			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);
+			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig, executor);
 		}
 		else {
 			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");

http://git-wip-us.apache.org/repos/asf/flink/blob/404d4252/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 551852e..cd88979 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -72,10 +73,10 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 	@SuppressWarnings("unchecked")
 	ZooKeeperMesosWorkerStore(
-		CuratorFramework client,
-		String storePath,
-		RetrievableStateStorageHelper<Worker> stateStorage
-	) throws Exception {
+			CuratorFramework client,
+			String storePath,
+			RetrievableStateStorageHelper<Worker> stateStorage,
+			Executor executor) throws Exception {
 		checkNotNull(storePath, "storePath");
 		checkNotNull(stateStorage, "stateStorage");
 
@@ -100,8 +101,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 		// using late-binding as a workaround for shaded curator dependency of flink-runtime.
 		this.workersInZooKeeper = ZooKeeperStateHandleStore.class
-			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class)
-			.newInstance(storeFacade, stateStorage);
+			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class, Executor.class)
+			.newInstance(storeFacade, stateStorage, executor);
 	}
 
 	@Override
@@ -284,7 +285,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 	 */
 	public static ZooKeeperMesosWorkerStore createMesosWorkerStore(
 			CuratorFramework client,
-			Configuration configuration) throws Exception {
+			Configuration configuration,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -297,6 +299,9 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		);
 
 		return new ZooKeeperMesosWorkerStore(
-			client, zooKeeperMesosWorkerStorePath, stateStorage);
+			client,
+			zooKeeperMesosWorkerStorePath,
+			stateStorage,
+			executor);
 	}
 }


[3/3] flink git commit: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFactory to create ZooKeeper utility classes

Posted by tr...@apache.org.
[FLINK-5508] [mesos] Introduce ZooKeeperUtilityFactory to create ZooKeeper utility classes

This commit adds utility classes to abstract the CuratorFramework dependency from ZooKeeper
utility classes away. That way it is possible for modules outside of flink-runtime to use
these utility classes without facing the problem of a relocated curator dependency.

Address PR comments

This closes #3158.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc1fe01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc1fe01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc1fe01

Branch: refs/heads/release-1.2
Commit: 9fc1fe01798537b623a1a3797e2e8c0967d4673c
Parents: b025212
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 18 15:06:12 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 10:24:25 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  47 ++++---
 .../services/MesosServices.java                 |  51 ++++++++
 .../services/MesosServicesUtils.java            |  57 +++++++++
 .../services/StandaloneMesosServices.java       |  39 ++++++
 .../services/ZooKeeperMesosServices.java        |  70 +++++++++++
 .../store/ZooKeeperMesosWorkerStore.java        |  93 ++------------
 .../MesosFlinkResourceManagerTest.java          |  12 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  11 +-
 .../runtime/zookeeper/ZooKeeperSharedCount.java |  53 ++++++++
 .../runtime/zookeeper/ZooKeeperSharedValue.java |  53 ++++++++
 .../zookeeper/ZooKeeperUtilityFactory.java      | 123 +++++++++++++++++++
 .../zookeeper/ZooKeeperVersionedValue.java      |  43 +++++++
 12 files changed, 541 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c9b6eed..de76d8e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -26,18 +26,16 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
-import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
 import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -48,7 +46,6 @@ import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -73,7 +70,6 @@ import java.net.URL;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -201,6 +197,7 @@ public class MesosApplicationMasterRunner {
 		MesosArtifactServer artifactServer = null;
 		ExecutorService futureExecutor = null;
 		ExecutorService ioExecutor = null;
+		MesosServices mesosServices = null;
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------
@@ -224,6 +221,8 @@ public class MesosApplicationMasterRunner {
 				numberProcessors,
 				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
 
+			mesosServices = MesosServicesUtils.createMesosServices(config);
+
 			// TM configuration
 			final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
 
@@ -316,7 +315,9 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Mesos Flink Resource Manager");
 
 			// create the worker store to persist task information across restarts
-			MesosWorkerStore workerStore = createWorkerStore(config, ioExecutor);
+			MesosWorkerStore workerStore = mesosServices.createMesosWorkerStore(
+				config,
+				ioExecutor);
 
 			// we need the leader retrieval service here to be informed of new
 			// leader session IDs, even though there can be only one leader ever
@@ -394,6 +395,14 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
+			if (mesosServices != null) {
+				try {
+					mesosServices.close(false);
+				} catch (Throwable tt) {
+					LOG.error("Error closing the mesos services.", tt);
+				}
+			}
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -424,6 +433,12 @@ public class MesosApplicationMasterRunner {
 			futureExecutor,
 			ioExecutor);
 
+		try {
+			mesosServices.close(true);
+		} catch (Throwable t) {
+			LOG.error("Failed to clean up and close MesosServices.", t);
+		}
+
 		return 0;
 	}
 
@@ -498,24 +513,6 @@ public class MesosApplicationMasterRunner {
 		return mesos;
 	}
 
-	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig, Executor executor) throws Exception {
-		MesosWorkerStore workerStore;
-		HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig);
-		if (recoveryMode == HighAvailabilityMode.NONE) {
-			workerStore = new StandaloneMesosWorkerStore();
-		}
-		else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
-			// note: the store is responsible for closing the client.
-			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
-			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig, executor);
-		}
-		else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");
-		}
-
-		return workerStore;
-	}
-
 	/**
 	 * Generate a container specification as a TaskManager template.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
new file mode 100644
index 0000000..5655bfc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Service factory interface for Mesos.
+ */
+public interface MesosServices {
+
+	/**
+	 * Creates a {@link MesosWorkerStore} which is used to persist mesos worker in high availability
+	 * mode.
+	 *
+	 * @param configuration to be used
+	 * @param executor to run asynchronous tasks
+	 * @return a mesos worker store
+	 * @throws Exception if the mesos worker store could not be created
+	 */
+	MesosWorkerStore createMesosWorkerStore(
+		Configuration configuration,
+		Executor executor) throws Exception;
+
+	/**
+	 * Closes all state maintained by the mesos services implementation.
+	 *
+	 * @param cleanup is true if a cleanup shall be performed
+	 * @throws Exception if the closing operation failed
+	 */
+	void close(boolean cleanup) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
new file mode 100644
index 0000000..13eb759
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+
+public class MesosServicesUtils {
+
+	/**
+	 * Creates a {@link MesosServices} instance depending on the high availability settings.
+	 *
+	 * @param configuration containing the high availability settings
+	 * @return a mesos services instance
+	 * @throws Exception if the mesos services instance could not be created
+	 */
+	public static MesosServices createMesosServices(Configuration configuration) throws Exception {
+		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
+
+		switch (highAvailabilityMode) {
+			case NONE:
+				return new StandaloneMesosServices();
+
+			case ZOOKEEPER:
+				final String zkMesosRootPath = configuration.getString(
+					ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
+					ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH);
+
+				ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
+					configuration,
+					zkMesosRootPath);
+
+				return new ZooKeeperMesosServices(zooKeeperUtilityFactory);
+
+			default:
+				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
new file mode 100644
index 0000000..dfbc2c3
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link MesosServices} implementation for the standalone mode.
+ */
+public class StandaloneMesosServices implements MesosServices {
+
+	@Override
+	public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) {
+		return new StandaloneMesosWorkerStore();
+	}
+
+	@Override
+	public void close(boolean cleanup) throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
new file mode 100644
index 0000000..2883e4f
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link MesosServices} implementation for the ZooKeeper high availability based mode.
+ */
+public class ZooKeeperMesosServices implements MesosServices {
+
+	// Factory to create ZooKeeper utility classes
+	private final ZooKeeperUtilityFactory zooKeeperUtilityFactory;
+
+	public ZooKeeperMesosServices(ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+		this.zooKeeperUtilityFactory = Preconditions.checkNotNull(zooKeeperUtilityFactory);
+	}
+
+	@Override
+	public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) throws Exception {
+		RetrievableStateStorageHelper<MesosWorkerStore.Worker> stateStorageHelper =
+			ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");
+
+		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
+			"/workers",
+			stateStorageHelper,
+			executor);
+
+		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
+		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
+
+		return new ZooKeeperMesosWorkerStore(
+			zooKeeperStateHandleStore,
+			frameworkId,
+			totalTaskCount);
+	}
+
+	@Override
+	public void close(boolean cleanup) throws Exception {
+		// this also closes the underlying CuratorFramework instance
+		zooKeeperUtilityFactory.close(cleanup);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index cd88979..5246b94 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
-import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -39,7 +34,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -53,56 +47,26 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 	private final Object startStopLock = new Object();
 
-	/** Root store path in ZK. */
-	private final String storePath;
-
-	/** Client (not a namespace facade) */
-	private final CuratorFramework client;
-
 	/** Flag indicating whether this instance is running. */
 	private boolean isRunning;
 
 	/** A persistent value of the assigned framework ID */
-	private final SharedValue frameworkIdInZooKeeper;
+	private final ZooKeeperSharedValue frameworkIdInZooKeeper;
 
 	/** A persistent count of all tasks created, for generating unique IDs */
-	private final SharedCount totalTaskCountInZooKeeper;
+	private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
 
 	/** A persistent store of serialized workers */
 	private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
 
 	@SuppressWarnings("unchecked")
-	ZooKeeperMesosWorkerStore(
-			CuratorFramework client,
-			String storePath,
-			RetrievableStateStorageHelper<Worker> stateStorage,
-			Executor executor) throws Exception {
-		checkNotNull(storePath, "storePath");
-		checkNotNull(stateStorage, "stateStorage");
-
-		// Keep a reference to the original client and not the namespace facade. The namespace
-		// facade cannot be closed.
-		this.client = checkNotNull(client, "client");
-		this.storePath = storePath;
-
-		// All operations will have the given path as root
-		client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient());
-		CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath);
-
-		// Track the assignd framework ID.
-		frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]);
-
-		// Keep a count of all tasks created ever, as the basis for a unique ID.
-		totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0);
-
-		// Keep track of the workers in state handle storage.
-		facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient());
-		CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers");
-
-		// using late-binding as a workaround for shaded curator dependency of flink-runtime.
-		this.workersInZooKeeper = ZooKeeperStateHandleStore.class
-			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class, Executor.class)
-			.newInstance(storeFacade, stateStorage, executor);
+	public ZooKeeperMesosWorkerStore(
+		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper,
+		ZooKeeperSharedValue frameworkIdInZooKeeper,
+		ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
+		this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
+		this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
+		this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
 	}
 
 	@Override
@@ -124,10 +88,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 				if(cleanup) {
 					workersInZooKeeper.removeAndDiscardAllState();
-					client.delete().deletingChildrenIfNeeded().forPath(storePath);
 				}
 
-				client.close();
 				isRunning = false;
 			}
 		}
@@ -188,7 +150,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 			int nextCount;
 			boolean success;
 			do {
-				VersionedValue<Integer> count = totalTaskCountInZooKeeper.getVersionedValue();
+				ZooKeeperVersionedValue<Integer> count = totalTaskCountInZooKeeper.getVersionedValue();
 				nextCount = count.getValue() + 1;
 				success = totalTaskCountInZooKeeper.trySetCount(count, nextCount);
 			}
@@ -275,33 +237,4 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		checkNotNull(taskID, "taskID");
 		return String.format("/%s", taskID.getValue());
 	}
-
-	/**
-	 * Create the ZooKeeper-backed Mesos worker store.
-	 * @param client the curator client.
-	 * @param configuration the Flink configuration.
-	 * @return a worker store.
-	 * @throws Exception
-	 */
-	public static ZooKeeperMesosWorkerStore createMesosWorkerStore(
-			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
-
-		checkNotNull(configuration, "Configuration");
-
-		RetrievableStateStorageHelper<MesosWorkerStore.Worker> stateStorage =
-			ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");
-
-		String zooKeeperMesosWorkerStorePath = configuration.getString(
-			ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH
-		);
-
-		return new ZooKeeperMesosWorkerStore(
-			client,
-			zooKeeperMesosWorkerStorePath,
-			stateStorage,
-			executor);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 93ccf68..dcf6a82 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.TestLogger;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
@@ -73,15 +74,18 @@ import static org.mockito.Mockito.*;
 /**
  * General tests for the Mesos resource manager component.
  */
-public class MesosFlinkResourceManagerTest {
+public class MesosFlinkResourceManagerTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
 
 	private static ActorSystem system;
 
-	private static Configuration config = new Configuration() {{
-		setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
-		setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+	private static Configuration config = new Configuration() {
+		private static final long serialVersionUID = -952579203067648838L;
+
+		{
+			setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+			setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 70ac6c8..e1d0515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.ConfigurationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
@@ -65,6 +66,8 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
+		Preconditions.checkNotNull(configuration, "configuration");
+
 		String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
 		if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
@@ -328,15 +331,19 @@ public class ZooKeeperUtils {
 		}
 	}
 
-	private static String generateZookeeperPath(String root, String namespace) {
+	public static String generateZookeeperPath(String root, String namespace) {
 		if (!namespace.startsWith("/")) {
-			namespace = "/" + namespace;
+			namespace = '/' + namespace;
 		}
 
 		if (namespace.endsWith("/")) {
 			namespace = namespace.substring(0, namespace.length() - 1);
 		}
 
+		if (root.endsWith("/")) {
+			root = root.substring(0, root.length() - 1);
+		}
+
 		return root + namespace;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
new file mode 100644
index 0000000..d6afbba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class for a {@link SharedCount} so that we don't expose a curator dependency in our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperSharedCount {
+
+	private final SharedCount sharedCount;
+
+	public ZooKeeperSharedCount(SharedCount sharedCount) {
+		this.sharedCount = Preconditions.checkNotNull(sharedCount);
+	}
+
+	public void start() throws Exception {
+		sharedCount.start();
+	}
+
+	public void close() throws IOException {
+		sharedCount.close();
+	}
+
+	public ZooKeeperVersionedValue<Integer> getVersionedValue() {
+		return new ZooKeeperVersionedValue<>(sharedCount.getVersionedValue());
+	}
+
+	public boolean trySetCount(ZooKeeperVersionedValue<Integer> previous, int newCount) throws Exception {
+		return sharedCount.trySetCount(previous.getVersionedValue(), newCount);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
new file mode 100644
index 0000000..fbe818d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class for a {@link SharedValue} so that we don't expose a curator dependency in our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperSharedValue {
+
+	private final SharedValue sharedValue;
+
+	public ZooKeeperSharedValue(SharedValue sharedValue) {
+		this.sharedValue = Preconditions.checkNotNull(sharedValue);
+	}
+
+	public void start() throws Exception {
+		sharedValue.start();
+	}
+
+	public void close() throws IOException {
+		sharedValue.close();
+	}
+
+	public void setValue(byte[] newValue) throws Exception {
+		sharedValue.setValue(newValue);
+	}
+
+	public byte[] getValue() {
+		return sharedValue.getValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
new file mode 100644
index 0000000..d3b7dc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
+ * curator framework is cached in this instance and shared among all created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the provided curator framework.
+ *
+ * <p>The curator framework is closed by calling the {@link #close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+	private final CuratorFramework root;
+
+	// Facade bound to the provided path
+	private final CuratorFramework facade;
+
+	public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception {
+		Preconditions.checkNotNull(path, "path");
+
+		root = ZooKeeperUtils.startCuratorFramework(configuration);
+
+		root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient());
+		facade = root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(), path));
+	}
+
+	/**
+	 * Closes the ZooKeeperUtilityFactory. This entails closing the cached {@link CuratorFramework}
+	 * instance. If cleanup is true, then the initial path and all its children are deleted.
+	 *
+	 * @param cleanup deletes the initial path and all of its children to clean up
+	 * @throws Exception when deleting the znodes
+	 */
+	public void close(boolean cleanup) throws Exception {
+		if (cleanup) {
+			facade.delete().deletingChildrenIfNeeded().forPath("/");
+		}
+
+		root.close();
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperStateHandleStore} instance with the provided arguments.
+	 *
+	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
+	 * @param stateStorageHelper storing the actual state data
+	 * @param executor to run asynchronous callbacks of the state handle store
+	 * @param <T> Type of the state to be stored
+	 * @return a ZooKeeperStateHandleStore instance
+	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
+	 *     ZooKeeper
+	 */
+	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
+			String zkStateHandleStorePath,
+			RetrievableStateStorageHelper<T> stateStorageHelper,
+			Executor executor) throws Exception {
+
+		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
+		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
+			ZooKeeperUtils.generateZookeeperPath(
+				facade.getNamespace(),
+				zkStateHandleStorePath));
+
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperSharedValue} to store a shared value between multiple instances.
+	 *
+	 * @param path to the shared value in ZooKeeper
+	 * @param seedValue for the shared value
+	 * @return a shared value
+	 */
+	public ZooKeeperSharedValue createSharedValue(String path, byte[] seedValue) {
+		return new ZooKeeperSharedValue(
+			new SharedValue(
+				facade,
+				path,
+				seedValue));
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperSharedCount} to store a shared count between multiple instances.
+	 *
+	 * @param path to the shared count in ZooKeeper
+	 * @param seedCount for the shared count
+	 * @return a shared count
+	 */
+	public ZooKeeperSharedCount createSharedCount(String path, int seedCount) {
+		return new ZooKeeperSharedCount(
+			new SharedCount(
+				facade,
+				path,
+				seedCount));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc1fe01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
new file mode 100644
index 0000000..d23cfc0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Wrapper class for a {@link VersionedValue} so that we don't expose a curator dependency in our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperVersionedValue<T> {
+
+	private final VersionedValue<T> versionedValue;
+
+	public ZooKeeperVersionedValue(VersionedValue<T> versionedValue) {
+		this.versionedValue = Preconditions.checkNotNull(versionedValue);
+	}
+
+	public T getValue() {
+		return versionedValue.getValue();
+	}
+
+	VersionedValue<T> getVersionedValue() {
+		return versionedValue;
+	}
+}