You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/10 10:13:59 UTC

[GitHub] aljoscha closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS

aljoscha closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/queryable_state_configuration.html b/docs/_includes/generated/queryable_state_configuration.html
index c457c40e147..5940fdc8229 100644
--- a/docs/_includes/generated/queryable_state_configuration.html
+++ b/docs/_includes/generated/queryable_state_configuration.html
@@ -42,5 +42,10 @@
             <td style="word-wrap: break-word;">0</td>
             <td>Number of query Threads for queryable state server. Uses the number of slots if set to 0.</td>
         </tr>
+        <tr>
+            <td><h5>queryable-state.enable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Option whether the queryable state proxy and server should be enabled where possible and configurable.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 6a8041926b7..20c6b53a209 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -106,6 +106,16 @@
 			.defaultValue(0)
 			.withDescription("Number of query Threads for queryable state server. Uses the number of slots if set to 0.");
 
+	/** Option whether the queryable state proxy and server should be enabled where possible and configurable.
+	 *
+	 * <p>Queryable state proxy and server are still more experimental features, hence disabled unless they are enable
+	 * in user configuration. */
+	public static final ConfigOption<Boolean> ENABLE_QUERYABLE_STATE_PROXY_SERVER =
+		key("queryable-state.enable")
+			.defaultValue(false)
+			.withDescription("Option whether the queryable state proxy and server should be enabled where possible" +
+				" and configurable.");
+
 	// ------------------------------------------------------------------------
 	// Client Options
 	// ------------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 9755b52e2f1..73520248475 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
 	private static Configuration getConfig() throws Exception {
 
 		Configuration config = new Configuration();
+		config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 94baaed3745..efade709489 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
 	private static Configuration getConfig() throws Exception {
 
 		Configuration config = new Configuration();
+		config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index 36637c0d342..0cc4b0b3812 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -79,6 +79,7 @@ public static void tearDown() {
 
 	private static Configuration getConfig() {
 		Configuration config = new Configuration();
+		config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index ce3e665fce6..38b2877e551 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -78,6 +78,7 @@ public static void tearDown() {
 
 	private static Configuration getConfig() {
 		Configuration config = new Configuration();
+		config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d124456c213..89e23dac1b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -345,7 +345,7 @@ public void start() throws IOException {
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;
-					throw new IOException("Failed to start the Queryable State Data Server.", ie);
+					LOG.error("Failed to start the Queryable State Data Server.", ie);
 				}
 			}
 
@@ -355,7 +355,7 @@ public void start() throws IOException {
 				} catch (Throwable ie) {
 					kvStateProxy.shutdown();
 					kvStateProxy = null;
-					throw new IOException("Failed to start the Queryable State Client Proxy.", ie);
+					LOG.error("Failed to start the Queryable State Client Proxy.", ie);
 				}
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c46d800bc95..f3feb2dab71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -419,32 +419,38 @@ private static NetworkEnvironment createNetworkEnvironment(
 
 		QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
 
-		int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
+		KvStateClientProxy kvClientProxy = null;
+		KvStateServer kvStateServer = null;
+
+		if (qsConfig != null) {
+			int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
 
-		int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
+			int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
 
-		final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
+
+			kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
 				taskManagerServicesConfiguration.getTaskManagerAddress(),
 				qsConfig.getProxyPortRange(),
 				numProxyServerNetworkThreads,
 				numProxyServerQueryThreads,
 				new DisabledKvStateRequestStats());
 
-		int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
+			int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
 
-		int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
+			int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
 
-		final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
+			kvStateServer = QueryableStateUtils.createKvStateServer(
 				taskManagerServicesConfiguration.getTaskManagerAddress(),
 				qsConfig.getStateServerPortRange(),
 				numStateServerNetworkThreads,
 				numStateServerQueryThreads,
 				kvStateRegistry,
 				new DisabledKvStateRequestStats());
+		}
 
 		// we start the network first, to make sure it can allocate its buffers first
 		return new NetworkEnvironment(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c27e54d7649..5acbddebfed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -39,6 +39,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -66,6 +67,7 @@
 
 	private final NetworkEnvironmentConfiguration networkConfig;
 
+	@Nullable
 	private final QueryableStateConfiguration queryableStateConfig;
 
 	/**
@@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration(
 			String[] localRecoveryStateRootDirectories,
 			boolean localRecoveryEnabled,
 			NetworkEnvironmentConfiguration networkConfig,
-			QueryableStateConfiguration queryableStateConfig,
+			@Nullable QueryableStateConfiguration queryableStateConfig,
 			int numberOfSlots,
 			long configuredMemory,
 			MemoryType memoryType,
@@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration(
 		this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
 		this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
 		this.networkConfig = checkNotNull(networkConfig);
-		this.queryableStateConfig = checkNotNull(queryableStateConfig);
+		this.queryableStateConfig = queryableStateConfig;
 		this.numberOfSlots = checkNotNull(numberOfSlots);
 
 		this.configuredMemory = configuredMemory;
@@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final Configuration config) {
 	 * Creates the {@link QueryableStateConfiguration} from the given Configuration.
 	 */
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
+		if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) {
+			return null;
+		}
 
 		final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
 				config.getString(QueryableStateOptions.PROXY_PORT_RANGE));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services