You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/03 08:29:00 UTC

[1/2] flink git commit: [FLINK-8683][docs] Regenerate config docs

Repository: flink
Updated Branches:
  refs/heads/master 14e7d35f2 -> a904964b7


[FLINK-8683][docs] Regenerate config docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaedc971
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaedc971
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaedc971

Branch: refs/heads/master
Commit: eaedc9711d18a91b143232d4f2a941a57f251a50
Parents: 14e7d35
Author: zentol <ch...@apache.org>
Authored: Wed May 2 22:38:05 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 3 10:28:14 2018 +0200

----------------------------------------------------------------------
 .../generated/checkpointing_configuration.html  | 10 ++++
 .../_includes/generated/core_configuration.html |  4 +-
 .../generated/netty_configuration.html          | 15 ------
 .../_includes/generated/rest_configuration.html | 21 ++++++--
 .../generated/task_manager_configuration.html   | 50 ++++++++++----------
 5 files changed, 55 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eaedc971/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index c64112e..8b4233f 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -28,6 +28,11 @@
             <td>Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.local-recovery</h5></td>
+            <td style="word-wrap: break-word;">"DISABLED"</td>
+            <td></td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.localdir</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
@@ -47,5 +52,10 @@
             <td style="word-wrap: break-word;">(none)</td>
             <td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.state.local.root-dirs</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td></td>
+        </tr>
     </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/eaedc971/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 5e29cb9..91fa1a5 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -29,8 +29,8 @@
         </tr>
         <tr>
             <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"flip6"</td>
-            <td>Switch to select the execution mode. Possible values are 'flip6' and 'old'.</td>
+            <td style="word-wrap: break-word;">"new"</td>
+            <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
         </tr>
         <tr>
             <td><h5>parallelism.default</h5></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/eaedc971/docs/_includes/generated/netty_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/netty_configuration.html b/docs/_includes/generated/netty_configuration.html
index e97fda4..47c48c0 100644
--- a/docs/_includes/generated/netty_configuration.html
+++ b/docs/_includes/generated/netty_configuration.html
@@ -42,20 +42,5 @@
             <td style="word-wrap: break-word;">"nio"</td>
             <td>The Netty transport type, either "nio" or "epoll"</td>
         </tr>
-        <tr>
-            <td><h5>taskmanager.network.credit-based-flow-control.enabled</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean flag to enable/disable network credit-based flow control</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
-            <td style="word-wrap: break-word;">2</td>
-            <td>Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
-            <td style="word-wrap: break-word;">8</td>
-            <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster</td>
-        </tr>
     </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/eaedc971/docs/_includes/generated/rest_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html
index 20fe961..1de4165 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -9,8 +9,8 @@
     <tbody>
         <tr>
             <td><h5>rest.address</h5></td>
-            <td style="word-wrap: break-word;">"localhost"</td>
-            <td>The address that the server binds itself to / the client connects to.</td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The address that should be used by clients to connect to the server.</td>
         </tr>
         <tr>
             <td><h5>rest.await-leader-timeout</h5></td>
@@ -18,13 +18,23 @@
             <td>The time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint</td>
         </tr>
         <tr>
+            <td><h5>rest.bind-address</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The address that the server binds itself.</td>
+        </tr>
+        <tr>
+            <td><h5>rest.client.max-content-length</h5></td>
+            <td style="word-wrap: break-word;">104857600</td>
+            <td>The maximum content length in bytes that the client will handle.</td>
+        </tr>
+        <tr>
             <td><h5>rest.connection-timeout</h5></td>
             <td style="word-wrap: break-word;">15000</td>
             <td>The maximum time in ms for the client to establish a TCP connection.</td>
         </tr>
         <tr>
             <td><h5>rest.port</h5></td>
-            <td style="word-wrap: break-word;">9065</td>
+            <td style="word-wrap: break-word;">8081</td>
             <td>The port that the server listens on / the client connects to.</td>
         </tr>
         <tr>
@@ -37,5 +47,10 @@
             <td style="word-wrap: break-word;">20</td>
             <td>The number of retries the client will attempt if a retryable operations fails.</td>
         </tr>
+        <tr>
+            <td><h5>rest.server.max-content-length</h5></td>
+            <td style="word-wrap: break-word;">104857600</td>
+            <td>The maximum content length in bytes that the server will handle.</td>
+        </tr>
     </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/eaedc971/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index bd42244..fe999a8 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -38,16 +38,16 @@
             <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.debug.memory.logIntervalMs</h5></td>
-            <td style="word-wrap: break-word;">5000</td>
-            <td>The interval (in ms) for the log thread to log the current memory usage.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.debug.memory.startLogThread</h5></td>
+            <td><h5>taskmanager.debug.memory.log</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.</td>
         </tr>
         <tr>
+            <td><h5>taskmanager.debug.memory.log-interval</h5></td>
+            <td style="word-wrap: break-word;">5000</td>
+            <td>The interval (in ms) for the log thread to log the current memory usage.</td>
+        </tr>
+        <tr>
             <td><h5>taskmanager.exit-on-fatal-akka-error</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.</td>
@@ -63,26 +63,11 @@
             <td>The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.initial-registration-pause</h5></td>
-            <td style="word-wrap: break-word;">"500 ms"</td>
-            <td>The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause.</td>
-        </tr>
-        <tr>
             <td><h5>taskmanager.jvm-exit-on-oom</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.max-registration-pause</h5></td>
-            <td style="word-wrap: break-word;">"30 s"</td>
-            <td>The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d).</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.maxRegistrationDuration</h5></td>
-            <td style="word-wrap: break-word;">"Inf"</td>
-            <td>Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td>
-        </tr>
-        <tr>
             <td><h5>taskmanager.memory.fraction</h5></td>
             <td style="word-wrap: break-word;">0.7</td>
             <td>The relative amount of memory (after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. This parameter is only evaluated, if taskmanager.memory.size is not set.</td>
@@ -115,12 +100,12 @@
         <tr>
             <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
             <td style="word-wrap: break-word;">2</td>
-            <td>Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).</td>
+            <td>Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
             <td style="word-wrap: break-word;">8</td>
-            <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).</td>
+            <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.memory.fraction</h5></td>
@@ -153,9 +138,24 @@
             <td>The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.refused-registration-pause</h5></td>
+            <td><h5>taskmanager.registration.initial-backoff</h5></td>
+            <td style="word-wrap: break-word;">"500 ms"</td>
+            <td>The initial registration backoff between two consecutive registration attempts. The backoff is doubled for each new registration attempt until it reaches the maximum registration backoff.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.registration.max-backoff</h5></td>
+            <td style="word-wrap: break-word;">"30 s"</td>
+            <td>The maximum registration backoff between two consecutive registration attempts. The max registration backoff requires a time unit specifier (ms/s/min/h/d).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.registration.refused-backoff</h5></td>
             <td style="word-wrap: break-word;">"10 s"</td>
-            <td>The pause after a registration has been refused by the job manager before retrying to connect.</td>
+            <td>The backoff after a registration has been refused by the job manager before retrying to connect.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.registration.timeout</h5></td>
+            <td style="word-wrap: break-word;">"Inf"</td>
+            <td>Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.rpc.port</h5></td>


[2/2] flink git commit: [FLINK-8683][docs] Add test for configuration docs completeness

Posted by ch...@apache.org.
[FLINK-8683][docs] Add test for configuration docs completeness

This closes #5515.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a904964b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a904964b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a904964b

Branch: refs/heads/master
Commit: a904964b75cd2403d3bfe6b705295fd0a676ed9d
Parents: eaedc97
Author: zentol <ch...@apache.org>
Authored: Fri Feb 16 23:52:15 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu May 3 10:28:18 2018 +0200

----------------------------------------------------------------------
 flink-docs/pom.xml                              |   8 +
 .../ConfigOptionsDocGenerator.java              |  42 ++--
 .../ConfigDocsCompletenessChecker.java          |  49 ----
 .../ConfigOptionsDocsCompletenessTest.java      | 225 +++++++++++++++++++
 4 files changed, 254 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a904964b/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 969e2f7..ef02d2c 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -89,6 +89,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<!-- Used for parsing HTML -->
+			<groupId>org.jsoup</groupId>
+			<artifactId>jsoup</artifactId>
+			<version>1.11.2</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<!-- Required because the surefire plugin tries to load the New category annotation -->
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils-junit</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/a904964b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index aadeee6..f9e94c8 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -48,7 +48,7 @@ import java.util.regex.Pattern;
  */
 public class ConfigOptionsDocGenerator {
 
-	private static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
+	static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
 		new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
 		new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.io.network.netty"),
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
@@ -107,7 +107,7 @@ public class ConfigOptionsDocGenerator {
 		});
 	}
 
-	private static void processConfigOptions(String rootDir, String module, String packageName, ThrowingConsumer<Class<?>, IOException> classConsumer) throws IOException, ClassNotFoundException {
+	static void processConfigOptions(String rootDir, String module, String packageName, ThrowingConsumer<Class<?>, IOException> classConsumer) throws IOException, ClassNotFoundException {
 		Path configDir = Paths.get(rootDir, module, "src/main/java", packageName.replaceAll("\\.", "/"));
 
 		try (DirectoryStream<Path> stream = Files.newDirectoryStream(configDir)) {
@@ -147,7 +147,7 @@ public class ConfigOptionsDocGenerator {
 		return tables;
 	}
 
-	private static List<OptionWithMetaInfo> extractConfigOptions(Class<?> clazz) {
+	static List<OptionWithMetaInfo> extractConfigOptions(Class<?> clazz) {
 		try {
 			List<OptionWithMetaInfo> configOptions = new ArrayList<>(8);
 			Field[] fields = clazz.getFields();
@@ -200,31 +200,31 @@ public class ConfigOptionsDocGenerator {
 	 */
 	private static String toHtmlString(final OptionWithMetaInfo optionWithMetaInfo) {
 		ConfigOption<?> option = optionWithMetaInfo.option;
-		String defaultValue;
+		String defaultValue = stringifyDefault(optionWithMetaInfo);
 
-		Documentation.OverrideDefault overrideDocumentedDefault = optionWithMetaInfo.field.getAnnotation(Documentation.OverrideDefault.class);
-		if (overrideDocumentedDefault != null) {
-			defaultValue = escapeCharacters(addWordBreakOpportunities(overrideDocumentedDefault.value()));
-		} else {
-			defaultValue = escapeCharacters(addWordBreakOpportunities(defaultValueToHtml(option.defaultValue())));
-		}
 		return "" +
 			"        <tr>\n" +
 			"            <td><h5>" + escapeCharacters(option.key()) + "</h5></td>\n" +
-			"            <td style=\"word-wrap: break-word;\">" + defaultValue + "</td>\n" +
+			"            <td style=\"word-wrap: break-word;\">" + escapeCharacters(addWordBreakOpportunities(defaultValue)) + "</td>\n" +
 			"            <td>" + escapeCharacters(option.description()) + "</td>\n" +
 			"        </tr>\n";
 	}
 
-	private static String defaultValueToHtml(Object value) {
-		if (value instanceof String) {
-			if (((String) value).isEmpty()) {
-				return "(none)";
+	static String stringifyDefault(OptionWithMetaInfo optionWithMetaInfo) {
+		ConfigOption<?> option = optionWithMetaInfo.option;
+		Documentation.OverrideDefault overrideDocumentedDefault = optionWithMetaInfo.field.getAnnotation(Documentation.OverrideDefault.class);
+		if (overrideDocumentedDefault != null) {
+			return overrideDocumentedDefault.value();
+		} else {
+			Object value = option.defaultValue();
+			if (value instanceof String) {
+				if (((String) value).isEmpty()) {
+					return "(none)";
+				}
+				return "\"" + value + "\"";
 			}
-			return "\"" + value + "\"";
+			return value == null ? "(none)" : value.toString();
 		}
-
-		return value == null ? "(none)" : value.toString();
 	}
 
 	private static String escapeCharacters(String value) {
@@ -328,9 +328,9 @@ public class ConfigOptionsDocGenerator {
 		}
 	}
 
-	private static class OptionWithMetaInfo {
-		private final ConfigOption<?> option;
-		private final Field field;
+	static class OptionWithMetaInfo {
+		final ConfigOption<?> option;
+		final Field field;
 
 		public OptionWithMetaInfo(ConfigOption<?> option, Field field) {
 			this.option = option;

http://git-wip-us.apache.org/repos/asf/flink/blob/a904964b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
deleted file mode 100644
index f0816f3..0000000
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.docs.configuration;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-/**
- * A small utility that collects all config keys that are not described in
- * the configuration reference documentation.
- */
-public class ConfigDocsCompletenessChecker {
-
-	public static void main(String[] args) throws Exception {
-
-		String configFileContents = FileUtils.readFileToString(new File("docs/setup/config.md"));
-		Field[] fields = ConfigConstants.class.getFields();
-
-		for (Field field : fields) {
-			if (Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class) && !field.getName().startsWith("DEFAULT")) {
-				Object val = field.get(null);
-				if (!configFileContents.contains((String) val)) {
-					System.out.println("++++ " + val + " is not mentioned in the configuration file!!!");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a904964b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessTest.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessTest.java
new file mode 100644
index 0000000..40a48b8
--- /dev/null
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import org.jsoup.Jsoup;
+import org.jsoup.nodes.Document;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.LOCATIONS;
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.extractConfigOptions;
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.processConfigOptions;
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.stringifyDefault;
+
+/**
+ * This test verifies that all {@link ConfigOption ConfigOptions} in the configured
+ * {@link ConfigOptionsDocGenerator#LOCATIONS locations} are documented and well-defined (i.e. no 2 options exist for
+ * the same key with different descriptions/default values), and that the documentation does not refer to non-existent
+ * options.
+ */
+public class ConfigOptionsDocsCompletenessTest {
+
+	@Test
+	public void testDocsCompleteness() throws IOException, ClassNotFoundException {
+		Map<String, DocumentedOption> documentedOptions = parseDocumentedOptions();
+		Map<String, ExistingOption> existingOptions = findExistingOptions();
+
+		final Collection<String> problems = new ArrayList<>(0);
+
+		// first check that all existing options are properly documented
+		existingOptions.forEach((key, supposedState) -> {
+			DocumentedOption documentedState = documentedOptions.remove(key);
+
+			// if nothing matches the docs for this option are up-to-date
+			if (documentedState == null) {
+				// option is not documented at all
+				problems.add("Option " + supposedState.key + " in " + supposedState.containingClass + " is not documented.");
+			} else if (!supposedState.defaultValue.equals(documentedState.defaultValue)) {
+				// default is outdated
+				problems.add("Documented default of " + supposedState.key + " in " + supposedState.containingClass +
+					" is outdated. Expected: " + supposedState.defaultValue + " Actual: " + documentedState.defaultValue);
+			} else if (!supposedState.description.equals(documentedState.description)) {
+				// description is outdated
+				problems.add("Documented description of " + supposedState.key + " in " + supposedState.containingClass +
+					" is outdated.");
+			}
+		});
+
+		// documentation contains an option that no longer exists
+		if (!documentedOptions.isEmpty()) {
+			for (DocumentedOption documentedOption : documentedOptions.values()) {
+				problems.add("Documented option " + documentedOption.key + " does not exist.");
+			}
+		}
+
+		if (!problems.isEmpty()) {
+			StringBuilder sb = new StringBuilder("Documentation is outdated, please regenerate it according to the" +
+				" instructions in flink-docs/README.md.");
+			sb.append(System.lineSeparator());
+			sb.append("\tProblems:");
+			for (String problem : problems) {
+				sb.append(System.lineSeparator());
+				sb.append("\t\t");
+				sb.append(problem);
+			}
+			Assert.fail(sb.toString());
+		}
+	}
+
+	private static Map<String, DocumentedOption> parseDocumentedOptions() throws IOException {
+		Path includeFolder = Paths.get("..", "docs", "_includes", "generated").toAbsolutePath();
+		return Files.list(includeFolder)
+			.filter(path -> path.getFileName().toString().contains("configuration"))
+			.flatMap(file -> {
+				try {
+					return parseDocumentedOptionsFromFile(file).stream();
+				} catch (IOException ignored) {
+					return Stream.empty();
+				}
+			})
+			.collect(Collectors.toMap(option -> option.key, option -> option, (option1, option2) -> {
+				if (option1.equals(option2)) {
+					// we allow multiple instances of ConfigOptions with the same key if they are identical
+					return option1;
+				} else {
+					// found a ConfigOption pair with the same key that aren't equal
+					// we fail here outright as this is not a documentation-completeness problem
+					if (!option1.defaultValue.equals(option2.defaultValue)) {
+						throw new AssertionError("Documentation contains distinct defaults for " +
+							option1.key + " in " + option1.containingFile + " and " + option2.containingFile + '.');
+					} else {
+						throw new AssertionError("Documentation contains distinct descriptions for " +
+							option1.key + " in " + option1.containingFile + " and " + option2.containingFile + '.');
+					}
+				}
+			}));
+	}
+
+	private static Collection<DocumentedOption> parseDocumentedOptionsFromFile(Path file) throws IOException {
+		Document document = Jsoup.parse(file.toFile(), StandardCharsets.UTF_8.name());
+		return document.getElementsByTag("table").stream()
+			.map(element -> element.getElementsByTag("tbody").get(0))
+			.flatMap(element -> element.getElementsByTag("tr").stream())
+			.map(tableRow -> {
+				String key = tableRow.child(0).text();
+				String defaultValue = tableRow.child(1).text();
+				String description = tableRow.child(2).text();
+				return new DocumentedOption(key, defaultValue, description, file.getName(file.getNameCount() - 1));
+			})
+			.collect(Collectors.toList());
+	}
+
+	private static Map<String, ExistingOption> findExistingOptions() throws IOException, ClassNotFoundException {
+		Map<String, ExistingOption> existingOptions = new HashMap<>(32);
+
+		for (OptionsClassLocation location : LOCATIONS) {
+			processConfigOptions("..", location.getModule(), location.getPackage(), optionsClass -> {
+				List<ConfigOptionsDocGenerator.OptionWithMetaInfo> configOptions = extractConfigOptions(optionsClass);
+				for (ConfigOptionsDocGenerator.OptionWithMetaInfo option : configOptions) {
+					String key = option.option.key();
+					String defaultValue = stringifyDefault(option);
+					String description = option.option.description();
+					ExistingOption duplicate = existingOptions.put(key, new ExistingOption(key, defaultValue, description, optionsClass));
+					if (duplicate != null) {
+						// multiple documented options have the same key
+						// we fail here outright as this is not a documentation-completeness problem
+						if (!(duplicate.description.equals(description))) {
+							throw new AssertionError("Ambiguous option " + key + " due to distinct descriptions.");
+						} else if (!duplicate.defaultValue.equals(defaultValue)) {
+							throw new AssertionError("Ambiguous option " + key + " due to distinct default values (" + defaultValue + " vs " + duplicate.defaultValue + ").");
+						}
+					}
+				}
+			});
+		}
+
+		return existingOptions;
+	}
+
+	private static final class ExistingOption extends Option {
+
+		private final Class<?> containingClass;
+
+		private ExistingOption(String key, String defaultValue, String description, Class<?> containingClass) {
+			super(key, defaultValue, description);
+			this.containingClass = containingClass;
+		}
+	}
+
+	private static final class DocumentedOption extends Option {
+
+		private final Path containingFile;
+
+		private DocumentedOption(String key, String defaultValue, String description, Path containingFile) {
+			super(key, defaultValue, description);
+			this.containingFile = containingFile;
+		}
+	}
+
+	private abstract static class Option {
+		protected final String key;
+		protected final String defaultValue;
+		protected final String description;
+
+		private Option(String key, String defaultValue, String description) {
+			this.key = key;
+			this.defaultValue = defaultValue;
+			this.description = description;
+		}
+
+		@Override
+		public int hashCode() {
+			return key.hashCode() + defaultValue.hashCode() + description.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (!(obj instanceof Option)) {
+				return false;
+			}
+
+			Option other = (Option) obj;
+
+			return this.key.equals(other.key)
+				&& this.defaultValue.equals(other.defaultValue)
+				&& this.description.equals(other.description);
+		}
+
+		@Override
+		public String toString() {
+			return "Option(key=" + key + ", default=" + defaultValue + ", description=" + description + ')';
+		}
+	}
+}