You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/07 13:09:49 UTC

[2/2] flink git commit: [FLINK-7823][QS] Update Queryable State configuration parameters.

[FLINK-7823][QS] Update Queryable State configuration parameters.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84746a86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84746a86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84746a86

Branch: refs/heads/master
Commit: 84746a861ae02dd1dbcf1938d7ae2bf0d604e35f
Parents: e8931bd
Author: kkloudas <kk...@gmail.com>
Authored: Mon Nov 6 12:43:18 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Nov 7 14:07:54 2017 +0100

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    | 25 +++----
 .../network/AbstractServerBase.java             |  8 +--
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../NonHAAbstractQueryableStateTestBase.java    |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../QueryableStateConfiguration.java            | 71 +++++++++++++-------
 .../taskexecutor/TaskManagerServices.java       | 56 +++++++--------
 .../TaskManagerServicesConfiguration.java       | 38 ++++++-----
 .../minicluster/LocalFlinkMiniCluster.scala     |  7 --
 10 files changed, 114 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index adba938..ac88bed 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -32,11 +32,6 @@ public class QueryableStateOptions {
 	// Server Options
 	// ------------------------------------------------------------------------
 
-	/** Flag to indicate whether to start the queryable state server. */
-	public static final ConfigOption<Boolean> SERVER_ENABLE =
-			key("query.server.enable")
-			.defaultValue(true);
-
 	/**
 	 * The config parameter defining the server port range of the queryable state proxy.
 	 *
@@ -59,6 +54,16 @@ public class QueryableStateOptions {
 			key("query.proxy.ports")
 			.defaultValue("9069");
 
+	/** Number of network (event loop) threads for the client proxy (0 => #slots). */
+	public static final ConfigOption<Integer> PROXY_NETWORK_THREADS =
+			key("query.proxy.network-threads")
+					.defaultValue(0);
+
+	/** Number of async query threads for the client proxy (0 => #slots). */
+	public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS =
+			key("query.proxy.query-threads")
+					.defaultValue(0);
+
 	/**
 	 * The config parameter defining the server port range of the queryable state server.
 	 *
@@ -100,16 +105,6 @@ public class QueryableStateOptions {
 			key("query.client.network-threads")
 			.defaultValue(0);
 
-	/** Number of retries on location lookup failures. */
-	public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRIES =
-			key("query.client.lookup.num-retries")
-			.defaultValue(3);
-
-	/** Retry delay on location lookup failures (millis). */
-	public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRY_DELAY =
-			key("query.client.lookup.retry-delay")
-			.defaultValue(1000);
-
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated. */

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 9c88774c..07ca26d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 */
 	public void start() throws Throwable {
 		Preconditions.checkState(serverAddress == null,
-				"Server " + serverName + " already running @ " + serverAddress + '.');
+				"The " + serverName + " already running @ " + serverAddress + '.');
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
 
 		if (serverAddress != null) {
-			LOG.info("Started server {} @ {}.", serverName, serverAddress);
+			LOG.info("Started the {} @ {}.", serverName, serverAddress);
 		} else {
-			LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName);
-			throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied.");
+			LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
+			throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index ab75cf4..fc4b2bc 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -55,8 +55,8 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
-			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+			config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
 			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
 			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
 			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 3f1a1fb..6945cca 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -46,7 +46,7 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
 			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
 			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
 			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 0b2727c..2775cd4 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
 		// the expected exception along with the adequate message
 		expectedEx.expect(FlinkRuntimeException.class);
-		expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied.");
+		expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
 
 		TestServer server1 = null;
 		TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index cb43fbf..4fffacd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,7 @@ public class NetworkEnvironment {
 			if (kvStateServer != null) {
 				try {
 					kvStateServer.start();
-					LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress());
+					LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 5e6b7c5..7823a1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.util.NetUtils;
+
 import java.util.Iterator;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -27,45 +30,44 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  */
 public class QueryableStateConfiguration {
 
-	private final boolean enabled;
-
 	private final Iterator<Integer> proxyPortRange;
 
 	private final Iterator<Integer> qserverPortRange;
 
+	private final int numProxyThreads;
+
+	private final int numPQueryThreads;
+
 	private final int numServerThreads;
 
-	private final int numQueryThreads;
+	private final int numSQueryThreads;
 
 	public QueryableStateConfiguration(
-			boolean enabled,
 			Iterator<Integer> proxyPortRange,
 			Iterator<Integer> qserverPortRange,
+			int numProxyThreads,
+			int numPQueryThreads,
 			int numServerThreads,
-			int numQueryThreads) {
+			int numSQueryThreads) {
 
-		checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext()));
-		checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext()));
+		checkArgument(proxyPortRange != null && proxyPortRange.hasNext());
+		checkArgument(qserverPortRange != null && qserverPortRange.hasNext());
+		checkArgument(numProxyThreads >= 0, "queryable state number of server threads must be zero or larger");
+		checkArgument(numPQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
 		checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger");
-		checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
+		checkArgument(numSQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
 
-		this.enabled = enabled;
 		this.proxyPortRange = proxyPortRange;
 		this.qserverPortRange = qserverPortRange;
+		this.numProxyThreads = numProxyThreads;
+		this.numPQueryThreads = numPQueryThreads;
 		this.numServerThreads = numServerThreads;
-		this.numQueryThreads = numQueryThreads;
+		this.numSQueryThreads = numSQueryThreads;
 	}
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Returns whether queryable state is enabled.
-	 */
-	public boolean isEnabled() {
-		return enabled;
-	}
-
-	/**
 	 * Returns the port range where the queryable state client proxy can listen.
 	 * See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}.
 	 */
@@ -85,7 +87,23 @@ public class QueryableStateConfiguration {
 	 * Returns the number of threads for the query server NIO event loop.
 	 * These threads only process network events and dispatch query requests to the query threads.
 	 */
-	public int numServerThreads() {
+	public int numProxyServerThreads() {
+		return numProxyThreads;
+	}
+
+	/**
+	 * Returns the number of threads for the thread pool that performs the actual state lookup.
+	 * These threads perform the actual state lookup.
+	 */
+	public int numProxyQueryThreads() {
+		return numPQueryThreads;
+	}
+
+	/**
+	 * Returns the number of threads for the query server NIO event loop.
+	 * These threads only process network events and dispatch query requests to the query threads.
+	 */
+	public int numStateServerThreads() {
 		return numServerThreads;
 	}
 
@@ -93,18 +111,19 @@ public class QueryableStateConfiguration {
 	 * Returns the number of threads for the thread pool that performs the actual state lookup.
 	 * These threads perform the actual state lookup.
 	 */
-	public int numQueryThreads() {
-		return numQueryThreads;
+	public int numStateQueryThreads() {
+		return numSQueryThreads;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
-		return "QueryableStateConfiguration {" +
-				"enabled=" + enabled +
-				", numServerThreads=" + numServerThreads +
-				", numQueryThreads=" + numQueryThreads +
+		return "QueryableStateConfiguration{" +
+				"numProxyServerThreads=" + numProxyThreads +
+				", numProxyQueryThreads=" + numPQueryThreads +
+				", numStateServerThreads=" + numServerThreads +
+				", numStateQueryThreads=" + numSQueryThreads +
 				'}';
 	}
 
@@ -114,6 +133,8 @@ public class QueryableStateConfiguration {
 	 * Gets the configuration describing the queryable state as deactivated.
 	 */
 	public static QueryableStateConfiguration disabled() {
-		return new QueryableStateConfiguration(false, null, null, 0, 0);
+		final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue());
+		final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue());
+		return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index aed03f6..4daff05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -327,33 +327,35 @@ public class TaskManagerServices {
 		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
 		KvStateRegistry kvStateRegistry = new KvStateRegistry();
-		KvStateClientProxy kvClientProxy = null;
-		KvStateServer kvStateServer = null;
-
-		if (taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) {
-			QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
-
-			int numNetworkThreads = qsConfig.numServerThreads() == 0 ?
-					taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numServerThreads();
-
-			int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
-					taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads();
-
-			kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
-					taskManagerServicesConfiguration.getTaskManagerAddress(),
-					qsConfig.getProxyPortRange(),
-					numNetworkThreads,
-					numQueryThreads,
-					new DisabledKvStateRequestStats());
-
-			kvStateServer = QueryableStateUtils.createKvStateServer(
-					taskManagerServicesConfiguration.getTaskManagerAddress(),
-					qsConfig.getStateServerPortRange(),
-					numNetworkThreads,
-					numQueryThreads,
-					kvStateRegistry,
-					new DisabledKvStateRequestStats());
-		}
+
+		QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
+
+		int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
+				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
+
+		int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
+				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
+
+		final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
+				taskManagerServicesConfiguration.getTaskManagerAddress(),
+				qsConfig.getProxyPortRange(),
+				numProxyServerNetworkThreads,
+				numProxyServerQueryThreads,
+				new DisabledKvStateRequestStats());
+
+		int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
+				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
+
+		int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
+				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
+
+		final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
+				taskManagerServicesConfiguration.getTaskManagerAddress(),
+				qsConfig.getStateServerPortRange(),
+				numStateServerNetworkThreads,
+				numStateServerQueryThreads,
+				kvStateRegistry,
+				new DisabledKvStateRequestStats());
 
 		// we start the network first, to make sure it can allocate its buffers first
 		return new NetworkEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 990fb22..bae683b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -400,23 +400,27 @@ public class TaskManagerServicesConfiguration {
 	 * Creates the {@link QueryableStateConfiguration} from the given Configuration.
 	 */
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
-		final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
-
-		if (enabled) {
-			final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
-					config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
-							QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
-			final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
-					config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
-							QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
-
-			final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
-			final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
-			return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads, numQueryThreads);
-		}
-		else {
-			return QueryableStateConfiguration.disabled();
-		}
+
+		final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
+				config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
+						QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+		final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
+				config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
+						QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+
+		final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
+		final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);
+
+		final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
+		final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
+
+		return new QueryableStateConfiguration(
+				proxyPorts,
+				serverPorts,
+				numProxyServerNetworkThreads,
+				numProxyServerQueryThreads,
+				numStateServerNetworkThreads,
+				numStateServerQueryThreads);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 89197e2..8ef2e36 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -94,13 +94,6 @@ class LocalFlinkMiniCluster(
     config.addAll(userConfiguration)
     setMemory(config)
     initializeIOFormatClasses(config)
-
-    // Disable queryable state server if nothing else is configured explicitly
-    if (!config.containsKey(QueryableStateOptions.SERVER_ENABLE.key())) {
-      LOG.info("Disabled queryable state server")
-      config.setBoolean(QueryableStateOptions.SERVER_ENABLE, false)
-    }
-
     config
   }