You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/07 13:11:24 UTC

[GitHub] eaglewatcherwb closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS

eaglewatcherwb closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/queryable_state_configuration.html b/docs/_includes/generated/queryable_state_configuration.html
index c457c40e147..a979658c070 100644
--- a/docs/_includes/generated/queryable_state_configuration.html
+++ b/docs/_includes/generated/queryable_state_configuration.html
@@ -12,6 +12,11 @@
             <td style="word-wrap: break-word;">0</td>
             <td>Number of network (Netty's event loop) Threads for queryable state client.</td>
         </tr>
+        <tr>
+            <td><h5>query.enable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Option whether the queryable state proxy and server should be enabled where possible and configurable.</td>
+        </tr>
         <tr>
             <td><h5>query.proxy.network-threads</h5></td>
             <td style="word-wrap: break-word;">0</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 6a8041926b7..0411a0eeb16 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -106,6 +106,16 @@
 			.defaultValue(0)
 			.withDescription("Number of query Threads for queryable state server. Uses the number of slots if set to 0.");
 
+	/** Option whether the queryable state proxy and server should be enabled where possible and configurable.
+	 *
+	 * <p>Queryable state proxy and server are still more experimental features, hence disabled unless they are enable
+	 * in user configuration. */
+	public static final ConfigOption<Boolean> ENABLE_QUERYABLE_STATE_PROXY_SERVER =
+		key("query.enable")
+			.defaultValue(false)
+			.withDescription("Option whether the queryable state proxy and server should be enabled where possible" +
+				" and configurable.");
+
 	// ------------------------------------------------------------------------
 	// Client Options
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d124456c213..89e23dac1b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -345,7 +345,7 @@ public void start() throws IOException {
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;
-					throw new IOException("Failed to start the Queryable State Data Server.", ie);
+					LOG.error("Failed to start the Queryable State Data Server.", ie);
 				}
 			}
 
@@ -355,7 +355,7 @@ public void start() throws IOException {
 				} catch (Throwable ie) {
 					kvStateProxy.shutdown();
 					kvStateProxy = null;
-					throw new IOException("Failed to start the Queryable State Client Proxy.", ie);
+					LOG.error("Failed to start the Queryable State Client Proxy.", ie);
 				}
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c46d800bc95..f3feb2dab71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -419,32 +419,38 @@ private static NetworkEnvironment createNetworkEnvironment(
 
 		QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
 
-		int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
+		KvStateClientProxy kvClientProxy = null;
+		KvStateServer kvStateServer = null;
+
+		if (qsConfig != null) {
+			int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
 
-		int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
+			int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
 
-		final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
+
+			kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
 				taskManagerServicesConfiguration.getTaskManagerAddress(),
 				qsConfig.getProxyPortRange(),
 				numProxyServerNetworkThreads,
 				numProxyServerQueryThreads,
 				new DisabledKvStateRequestStats());
 
-		int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
+			int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
 
-		int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
+			int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
 				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
 
-		final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
+			kvStateServer = QueryableStateUtils.createKvStateServer(
 				taskManagerServicesConfiguration.getTaskManagerAddress(),
 				qsConfig.getStateServerPortRange(),
 				numStateServerNetworkThreads,
 				numStateServerQueryThreads,
 				kvStateRegistry,
 				new DisabledKvStateRequestStats());
+		}
 
 		// we start the network first, to make sure it can allocate its buffers first
 		return new NetworkEnvironment(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c27e54d7649..5acbddebfed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -39,6 +39,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -66,6 +67,7 @@
 
 	private final NetworkEnvironmentConfiguration networkConfig;
 
+	@Nullable
 	private final QueryableStateConfiguration queryableStateConfig;
 
 	/**
@@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration(
 			String[] localRecoveryStateRootDirectories,
 			boolean localRecoveryEnabled,
 			NetworkEnvironmentConfiguration networkConfig,
-			QueryableStateConfiguration queryableStateConfig,
+			@Nullable QueryableStateConfiguration queryableStateConfig,
 			int numberOfSlots,
 			long configuredMemory,
 			MemoryType memoryType,
@@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration(
 		this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
 		this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
 		this.networkConfig = checkNotNull(networkConfig);
-		this.queryableStateConfig = checkNotNull(queryableStateConfig);
+		this.queryableStateConfig = queryableStateConfig;
 		this.numberOfSlots = checkNotNull(numberOfSlots);
 
 		this.configuredMemory = configuredMemory;
@@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final Configuration config) {
 	 * Creates the {@link QueryableStateConfiguration} from the given Configuration.
 	 */
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
+		if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) {
+			return null;
+		}
 
 		final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
 				config.getString(QueryableStateOptions.PROXY_PORT_RANGE));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index 5cd1a50176d..ccbd9e0b590 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -38,6 +38,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.configuration.QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER;
+
 /**
  * Resource which starts a {@link MiniCluster} for testing purposes.
  */
@@ -113,6 +115,7 @@ public void after() {
 
 	private void startMiniCluster() throws Exception {
 		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+		configuration.setBoolean(ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
 		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
 
 		// we need to set this since a lot of test expect this because TestBaseUtils.startCluster()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services