You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/07 13:09:49 UTC
[2/2] flink git commit: [FLINK-7823][QS] Update Queryable State
configuration parameters.
[FLINK-7823][QS] Update Queryable State configuration parameters.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84746a86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84746a86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84746a86
Branch: refs/heads/master
Commit: 84746a861ae02dd1dbcf1938d7ae2bf0d604e35f
Parents: e8931bd
Author: kkloudas <kk...@gmail.com>
Authored: Mon Nov 6 12:43:18 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Nov 7 14:07:54 2017 +0100
----------------------------------------------------------------------
.../configuration/QueryableStateOptions.java | 25 +++----
.../network/AbstractServerBase.java | 8 +--
.../HAAbstractQueryableStateTestBase.java | 2 +-
.../NonHAAbstractQueryableStateTestBase.java | 2 +-
.../network/AbstractServerTest.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 2 +-
.../QueryableStateConfiguration.java | 71 +++++++++++++-------
.../taskexecutor/TaskManagerServices.java | 56 +++++++--------
.../TaskManagerServicesConfiguration.java | 38 ++++++-----
.../minicluster/LocalFlinkMiniCluster.scala | 7 --
10 files changed, 114 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index adba938..ac88bed 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -32,11 +32,6 @@ public class QueryableStateOptions {
// Server Options
// ------------------------------------------------------------------------
- /** Flag to indicate whether to start the queryable state server. */
- public static final ConfigOption<Boolean> SERVER_ENABLE =
- key("query.server.enable")
- .defaultValue(true);
-
/**
* The config parameter defining the server port range of the queryable state proxy.
*
@@ -59,6 +54,16 @@ public class QueryableStateOptions {
key("query.proxy.ports")
.defaultValue("9069");
+ /** Number of network (event loop) threads for the client proxy (0 => #slots). */
+ public static final ConfigOption<Integer> PROXY_NETWORK_THREADS =
+ key("query.proxy.network-threads")
+ .defaultValue(0);
+
+ /** Number of async query threads for the client proxy (0 => #slots). */
+ public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS =
+ key("query.proxy.query-threads")
+ .defaultValue(0);
+
/**
* The config parameter defining the server port range of the queryable state server.
*
@@ -100,16 +105,6 @@ public class QueryableStateOptions {
key("query.client.network-threads")
.defaultValue(0);
- /** Number of retries on location lookup failures. */
- public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRIES =
- key("query.client.lookup.num-retries")
- .defaultValue(3);
-
- /** Retry delay on location lookup failures (millis). */
- public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRY_DELAY =
- key("query.client.lookup.retry-delay")
- .defaultValue(1000);
-
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 9c88774c..07ca26d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
*/
public void start() throws Throwable {
Preconditions.checkState(serverAddress == null,
- "Server " + serverName + " already running @ " + serverAddress + '.');
+ "The " + serverName + " already running @ " + serverAddress + '.');
Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
if (serverAddress != null) {
- LOG.info("Started server {} @ {}.", serverName, serverAddress);
+ LOG.info("Started the {} @ {}.", serverName, serverAddress);
} else {
- LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName);
- throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied.");
+ LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
+ throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index ab75cf4..fc4b2bc 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -55,8 +55,8 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
- config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+ config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 3f1a1fb..6945cca 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -46,7 +46,7 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
- config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+ config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 0b2727c..2775cd4 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
// the expected exception along with the adequate message
expectedEx.expect(FlinkRuntimeException.class);
- expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied.");
+ expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
TestServer server1 = null;
TestServer server2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index cb43fbf..4fffacd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,7 @@ public class NetworkEnvironment {
if (kvStateServer != null) {
try {
kvStateServer.start();
- LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress());
+ LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 5e6b7c5..7823a1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.util.NetUtils;
+
import java.util.Iterator;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -27,45 +30,44 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*/
public class QueryableStateConfiguration {
- private final boolean enabled;
-
private final Iterator<Integer> proxyPortRange;
private final Iterator<Integer> qserverPortRange;
+ private final int numProxyThreads;
+
+ private final int numPQueryThreads;
+
private final int numServerThreads;
- private final int numQueryThreads;
+ private final int numSQueryThreads;
public QueryableStateConfiguration(
- boolean enabled,
Iterator<Integer> proxyPortRange,
Iterator<Integer> qserverPortRange,
+ int numProxyThreads,
+ int numPQueryThreads,
int numServerThreads,
- int numQueryThreads) {
+ int numSQueryThreads) {
- checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext()));
- checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext()));
+ checkArgument(proxyPortRange != null && proxyPortRange.hasNext());
+ checkArgument(qserverPortRange != null && qserverPortRange.hasNext());
+ checkArgument(numProxyThreads >= 0, "queryable state number of server threads must be zero or larger");
+ checkArgument(numPQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger");
- checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
+ checkArgument(numSQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
- this.enabled = enabled;
this.proxyPortRange = proxyPortRange;
this.qserverPortRange = qserverPortRange;
+ this.numProxyThreads = numProxyThreads;
+ this.numPQueryThreads = numPQueryThreads;
this.numServerThreads = numServerThreads;
- this.numQueryThreads = numQueryThreads;
+ this.numSQueryThreads = numSQueryThreads;
}
// ------------------------------------------------------------------------
/**
- * Returns whether queryable state is enabled.
- */
- public boolean isEnabled() {
- return enabled;
- }
-
- /**
* Returns the port range where the queryable state client proxy can listen.
* See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}.
*/
@@ -85,7 +87,23 @@ public class QueryableStateConfiguration {
* Returns the number of threads for the query server NIO event loop.
* These threads only process network events and dispatch query requests to the query threads.
*/
- public int numServerThreads() {
+ public int numProxyServerThreads() {
+ return numProxyThreads;
+ }
+
+ /**
+ * Returns the number of threads for the thread pool that performs the actual state lookup.
+ * These threads perform the actual state lookup.
+ */
+ public int numProxyQueryThreads() {
+ return numPQueryThreads;
+ }
+
+ /**
+ * Returns the number of threads for the query server NIO event loop.
+ * These threads only process network events and dispatch query requests to the query threads.
+ */
+ public int numStateServerThreads() {
return numServerThreads;
}
@@ -93,18 +111,19 @@ public class QueryableStateConfiguration {
* Returns the number of threads for the thread pool that performs the actual state lookup.
* These threads perform the actual state lookup.
*/
- public int numQueryThreads() {
- return numQueryThreads;
+ public int numStateQueryThreads() {
+ return numSQueryThreads;
}
// ------------------------------------------------------------------------
@Override
public String toString() {
- return "QueryableStateConfiguration {" +
- "enabled=" + enabled +
- ", numServerThreads=" + numServerThreads +
- ", numQueryThreads=" + numQueryThreads +
+ return "QueryableStateConfiguration{" +
+ "numProxyServerThreads=" + numProxyThreads +
+ ", numProxyQueryThreads=" + numPQueryThreads +
+ ", numStateServerThreads=" + numServerThreads +
+ ", numStateQueryThreads=" + numSQueryThreads +
'}';
}
@@ -114,6 +133,8 @@ public class QueryableStateConfiguration {
* Gets the configuration describing the queryable state as deactivated.
*/
public static QueryableStateConfiguration disabled() {
- return new QueryableStateConfiguration(false, null, null, 0, 0);
+ final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue());
+ final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue());
+ return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index aed03f6..4daff05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -327,33 +327,35 @@ public class TaskManagerServices {
TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
KvStateRegistry kvStateRegistry = new KvStateRegistry();
- KvStateClientProxy kvClientProxy = null;
- KvStateServer kvStateServer = null;
-
- if (taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) {
- QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
-
- int numNetworkThreads = qsConfig.numServerThreads() == 0 ?
- taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numServerThreads();
-
- int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
- taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads();
-
- kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
- taskManagerServicesConfiguration.getTaskManagerAddress(),
- qsConfig.getProxyPortRange(),
- numNetworkThreads,
- numQueryThreads,
- new DisabledKvStateRequestStats());
-
- kvStateServer = QueryableStateUtils.createKvStateServer(
- taskManagerServicesConfiguration.getTaskManagerAddress(),
- qsConfig.getStateServerPortRange(),
- numNetworkThreads,
- numQueryThreads,
- kvStateRegistry,
- new DisabledKvStateRequestStats());
- }
+
+ QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
+
+ int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
+ taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
+
+ int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
+ taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
+
+ final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
+ taskManagerServicesConfiguration.getTaskManagerAddress(),
+ qsConfig.getProxyPortRange(),
+ numProxyServerNetworkThreads,
+ numProxyServerQueryThreads,
+ new DisabledKvStateRequestStats());
+
+ int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
+ taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
+
+ int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
+ taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
+
+ final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
+ taskManagerServicesConfiguration.getTaskManagerAddress(),
+ qsConfig.getStateServerPortRange(),
+ numStateServerNetworkThreads,
+ numStateServerQueryThreads,
+ kvStateRegistry,
+ new DisabledKvStateRequestStats());
// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 990fb22..bae683b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -400,23 +400,27 @@ public class TaskManagerServicesConfiguration {
* Creates the {@link QueryableStateConfiguration} from the given Configuration.
*/
private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
- final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
-
- if (enabled) {
- final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
- config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
- QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
- final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
- config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
- QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
-
- final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
- final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
- return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads, numQueryThreads);
- }
- else {
- return QueryableStateConfiguration.disabled();
- }
+
+ final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
+ config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
+ QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+ final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
+ config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
+ QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+
+ final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
+ final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);
+
+ final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
+ final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
+
+ return new QueryableStateConfiguration(
+ proxyPorts,
+ serverPorts,
+ numProxyServerNetworkThreads,
+ numProxyServerQueryThreads,
+ numStateServerNetworkThreads,
+ numStateServerQueryThreads);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 89197e2..8ef2e36 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -94,13 +94,6 @@ class LocalFlinkMiniCluster(
config.addAll(userConfiguration)
setMemory(config)
initializeIOFormatClasses(config)
-
- // Disable queryable state server if nothing else is configured explicitly
- if (!config.containsKey(QueryableStateOptions.SERVER_ENABLE.key())) {
- LOG.info("Disabled queryable state server")
- config.setBoolean(QueryableStateOptions.SERVER_ENABLE, false)
- }
-
config
}