You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/03 14:16:10 UTC

[flink] 05/10: [FLINK-15824][docs] (follow-up) Simple improvements/cleanups on @SectionOption

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8ade873c841c083d2f705e1beb4bb55eb4311e0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jan 29 20:37:52 2020 +0100

    [FLINK-15824][docs] (follow-up) Simple improvements/cleanups on @SectionOption
    
      - Rename '@SectionOption' to '@Section' for brevity
    
      - Rename '@Section.sections()' to '@Section.value()'
        That way, the method name can be skipped when only sections (no positions) are specified,
        which should is the common case after config documentation rework is complete.
    
      - Adjust test for @Section annotation
---
 .../flink/annotation/docs/Documentation.java       |   6 +-
 .../flink/configuration/CheckpointingOptions.java  |  18 ++--
 .../apache/flink/configuration/CoreOptions.java    |   6 +-
 .../configuration/HighAvailabilityOptions.java     |  12 +--
 .../flink/configuration/JobManagerOptions.java     |   6 +-
 .../flink/configuration/SecurityOptions.java       |  12 +--
 .../flink/configuration/TaskManagerOptions.java    |  18 ++--
 .../configuration/ConfigOptionsDocGenerator.java   |  12 +--
 .../ConfigOptionsDocGeneratorTest.java             |  48 +++++++--
 .../ConfigOptionsDocsCompletenessITCase.java       | 118 ++++++++++++---------
 .../docs/configuration/data/TestCommonOptions.java |   9 +-
 11 files changed, 161 insertions(+), 104 deletions(-)

diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index a1e142a..e6c507f 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -44,13 +44,13 @@ public final class Documentation {
 	 * Annotation used on config option fields to include them in specific sections. Sections are groups of options
 	 * that are aggregated across option classes, with each group being placed into a dedicated file.
 	 *
-	 * <p>The {@link SectionOption#position()} argument controls the position in the generated table, with lower values
+	 * <p>The {@link Section#position()} argument controls the position in the generated table, with lower values
 	 * being placed at the top. Fields with the same position are sorted alphabetically by key.
 	 */
 	@Target(ElementType.FIELD)
 	@Retention(RetentionPolicy.RUNTIME)
 	@Internal
-	public @interface SectionOption {
+	public @interface Section {
 		int POSITION_MEMORY = 10;
 		int POSITION_PARALLELISM_SLOTS = 20;
 		int POSITION_FAULT_TOLERANCE = 30;
@@ -62,7 +62,7 @@ public final class Documentation {
 		/**
 		 * The sections in the config docs where this option should be included.
 		 */
-		String[] sections() default {};
+		String[] value() default {};
 
 		/**
 		 * The relative position of the option in its section.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 03284ca..566bac6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -31,9 +31,9 @@ public class CheckpointingOptions {
 	// ------------------------------------------------------------------------
 
 	/** The state backend to be used to store and checkpoint state. */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 			.key("state.backend")
 			.noDefaultValue()
@@ -105,9 +105,9 @@ public class CheckpointingOptions {
 
 	/** The default directory for savepoints. Used by the state backends that write
 	 * savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
 			.key("state.savepoints.dir")
 			.noDefaultValue()
@@ -117,9 +117,9 @@ public class CheckpointingOptions {
 
 	/** The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem.
 	 * The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).*/
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
 			.key("state.checkpoints.dir")
 			.noDefaultValue()
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 2d7bc55..2449b4e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -250,9 +250,9 @@ public class CoreOptions {
 	//  program
 	// ------------------------------------------------------------------------
 
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_PARALLELISM_SLOTS)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_PARALLELISM_SLOTS)
 	public static final ConfigOption<Integer> DEFAULT_PARALLELISM = ConfigOptions
 		.key("parallelism.default")
 		.defaultValue(1)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index b6c0f02..0087878 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -45,9 +45,9 @@ public class HighAvailabilityOptions {
 	 * To enable high-availability, set this mode to "ZOOKEEPER".
 	 * Can also be set to FQN of HighAvailability factory class.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_HIGH_AVAILABILITY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_MODE =
 			key("high-availability")
 			.defaultValue("NONE")
@@ -69,9 +69,9 @@ public class HighAvailabilityOptions {
 	/**
 	 * File system path (URI) where Flink persists metadata in high-availability setups.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_HIGH_AVAILABILITY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_STORAGE_PATH =
 			key("high-availability.storageDir")
 			.noDefaultValue()
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ef49421..47755e2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -79,9 +79,9 @@ public class JobManagerOptions {
 	/**
 	 * JVM heap size for the JobManager with memory size.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_MEMORY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_MEMORY)
 	public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
 		key("jobmanager.heap.size")
 		.defaultValue("1024m")
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 5235dcf..8297cfd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -106,9 +106,9 @@ public class SecurityOptions {
 	/**
 	 * Enable SSL for internal communication (akka rpc, netty data transport, blob server).
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_SECURITY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_SECURITY)
 	public static final ConfigOption<Boolean> SSL_INTERNAL_ENABLED =
 			key("security.ssl.internal.enabled")
 			.defaultValue(false)
@@ -119,9 +119,9 @@ public class SecurityOptions {
 	/**
 	 * Enable SSL for external REST endpoints.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_SECURITY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_SECURITY)
 	public static final ConfigOption<Boolean> SSL_REST_ENABLED =
 			key("security.ssl.rest.enabled")
 			.defaultValue(false)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index cf532a4..05ad75c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -165,9 +165,9 @@ public class TaskManagerOptions {
 	/**
 	 * The config parameter defining the number of task slots of a task manager.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_PARALLELISM_SLOTS)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_PARALLELISM_SLOTS)
 	public static final ConfigOption<Integer> NUM_TASK_SLOTS =
 		key("taskmanager.numberOfTaskSlots")
 			.intType()
@@ -254,9 +254,9 @@ public class TaskManagerOptions {
 	/**
 	 * Total Process Memory size for the TaskExecutors.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_MEMORY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_MEMORY)
 	public static final ConfigOption<MemorySize> TOTAL_PROCESS_MEMORY =
 		key("taskmanager.memory.process.size")
 			.memoryType()
@@ -270,9 +270,9 @@ public class TaskManagerOptions {
 	/**
 	 * Total Flink Memory size for the TaskExecutors.
 	 */
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = Documentation.SectionOption.POSITION_MEMORY)
+	@Documentation.Section(
+		value = {Documentation.Section.SECTION_COMMON},
+		position = Documentation.Section.POSITION_MEMORY)
 	public static final ConfigOption<MemorySize> TOTAL_FLINK_MEMORY =
 		key("taskmanager.memory.flink.size")
 			.memoryType()
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 1694620..7e5f9c4 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -101,7 +101,7 @@ public class ConfigOptionsDocGenerator {
 	 * every {@link ConfigOption}.
 	 *
 	 * <p>One additional table is generated containing all {@link ConfigOption ConfigOptions} that are annotated with
-	 * {@link Documentation.SectionOption}.
+	 * {@link Documentation.Section}.
 	 *
 	 * @param args
 	 *  [0] output directory for the generated files
@@ -127,12 +127,12 @@ public class ConfigOptionsDocGenerator {
 
 		Map<String, List<OptionWithMetaInfo>> optionsGroupedBySection = allSectionOptions.stream()
 			.flatMap(option -> {
-				final String[] sections = option.field.getAnnotation(Documentation.SectionOption.class).sections();
+				final String[] sections = option.field.getAnnotation(Documentation.Section.class).value();
 				if (sections.length == 0) {
 					throw new RuntimeException(String.format(
 						"Option %s is annotated with %s but the list of sections is empty.",
 						option.option.key(),
-						Documentation.SectionOption.class.getSimpleName()));
+						Documentation.Section.class.getSimpleName()));
 				}
 
 				return Arrays.stream(sections).map(section -> Tuple2.of(section, option));
@@ -142,8 +142,8 @@ public class ConfigOptionsDocGenerator {
 		optionsGroupedBySection.forEach(
 			(section, options) -> {
 				options.sort((o1, o2) -> {
-					int position1 = o1.field.getAnnotation(Documentation.SectionOption.class).position();
-					int position2 = o2.field.getAnnotation(Documentation.SectionOption.class).position();
+					int position1 = o1.field.getAnnotation(Documentation.Section.class).position();
+					int position2 = o2.field.getAnnotation(Documentation.Section.class).position();
 					if (position1 == position2) {
 						return o1.option.key().compareTo(o2.option.key());
 					} else {
@@ -169,7 +169,7 @@ public class ConfigOptionsDocGenerator {
 	private static Collection<OptionWithMetaInfo> findSectionOptions(String rootDir, String module, String packageName, String pathPrefix) throws IOException, ClassNotFoundException {
 		Collection<OptionWithMetaInfo> commonOptions = new ArrayList<>(32);
 		processConfigOptions(rootDir, module, packageName, pathPrefix, optionsClass -> extractConfigOptions(optionsClass).stream()
-			.filter(optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.SectionOption.class) != null)
+			.filter(optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.Section.class) != null)
 			.forEachOrdered(commonOptions::add));
 		return commonOptions;
 	}
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
index 1e9731c..c0a88aa 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
@@ -34,9 +34,9 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -423,8 +423,8 @@ public class ConfigOptionsDocGeneratorTest {
 	}
 
 	@Test
-	public void testCommonOptions() throws IOException, ClassNotFoundException {
-		final String projectRootDir = System.getProperty("rootDir");
+	public void testSections() throws IOException, ClassNotFoundException {
+		final String projectRootDir = getProjectRootDir();
 		final String outputDirectory = TMP.newFolder().getAbsolutePath();
 
 		final OptionsClassLocation[] locations = new OptionsClassLocation[] {
@@ -434,7 +434,7 @@ public class ConfigOptionsDocGeneratorTest {
 		ConfigOptionsDocGenerator.generateCommonSection(projectRootDir, outputDirectory, locations, "src/test/java");
 		Formatter formatter = new HtmlFormatter();
 
-		String expected =
+		String expected1 =
 			"<table class=\"table table-bordered\">\n" +
 			"    <thead>\n" +
 			"        <tr>\n" +
@@ -460,9 +460,34 @@ public class ConfigOptionsDocGeneratorTest {
 			"    </tbody>\n" +
 			"</table>\n";
 
-		String output = FileUtils.readFile(Paths.get(outputDirectory, ConfigOptionsDocGenerator.getSectionFileName("common")).toFile(), StandardCharsets.UTF_8.name());
+		String expected2 =
+			"<table class=\"table table-bordered\">\n" +
+				"    <thead>\n" +
+				"        <tr>\n" +
+				"            <th class=\"text-left\" style=\"width: 20%\">Key</th>\n" +
+				"            <th class=\"text-left\" style=\"width: 15%\">Default</th>\n" +
+				"            <th class=\"text-left\" style=\"width: 10%\">Type</th>\n" +
+				"            <th class=\"text-left\" style=\"width: 55%\">Description</th>\n" +
+				"        </tr>\n" +
+				"    </thead>\n" +
+				"    <tbody>\n" +
+				"        <tr>\n" +
+				"            <td><h5>" + TestCommonOptions.COMMON_OPTION.key() + "</h5></td>\n" +
+				"            <td style=\"word-wrap: break-word;\">" + TestCommonOptions.COMMON_OPTION.defaultValue() + "</td>\n" +
+				"            <td>Integer</td>\n" +
+				"            <td>" + formatter.format(TestCommonOptions.COMMON_OPTION.description()) + "</td>\n" +
+				"        </tr>\n" +
+				"    </tbody>\n" +
+				"</table>\n";
+
+		final String fileName1 = ConfigOptionsDocGenerator.getSectionFileName(TestCommonOptions.SECTION_1);
+		final String fileName2 = ConfigOptionsDocGenerator.getSectionFileName(TestCommonOptions.SECTION_2);
 
-		assertEquals(expected, output);
+		final String output1 = FileUtils.readFile(new File(outputDirectory, fileName1), StandardCharsets.UTF_8.name());
+		final String output2 = FileUtils.readFile(new File(outputDirectory, fileName2), StandardCharsets.UTF_8.name());
+
+		assertEquals(expected1, output1);
+		assertEquals(expected2, output2);
 	}
 
 	static class TestConfigGroupWithExclusion {
@@ -509,4 +534,15 @@ public class ConfigOptionsDocGeneratorTest {
 
 		assertEquals(expectedTable, htmlTable);
 	}
+
+	static String getProjectRootDir() {
+		final String dirFromProperty = System.getProperty("rootDir");
+		if (dirFromProperty != null) {
+			return dirFromProperty;
+		}
+
+		// to make this work in the IDE use a default fallback
+		final String currentDir = System.getProperty("user.dir");
+		return new File(currentDir).getParent();
+	}
 }
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
index bd6316e..49d03eb 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.docs.configuration;
 
 import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.description.Formatter;
 import org.apache.flink.configuration.description.HtmlFormatter;
@@ -36,10 +37,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -63,59 +66,64 @@ public class ConfigOptionsDocsCompletenessITCase {
 	private static final Formatter htmlFormatter = new HtmlFormatter();
 
 	@Test
-	public void testCommonSectionCompleteness() throws IOException, ClassNotFoundException {
-		Map<String, List<DocumentedOption>> documentedOptions = parseDocumentedCommonOptions();
-		Map<String, List<ExistingOption>> existingOptions = findExistingOptions(
-			optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.SectionOption.class) != null);
+	public void testCompleteness() throws IOException, ClassNotFoundException {
+		final Map<String, List<DocumentedOption>> documentedOptions = parseDocumentedOptions();
+		final Map<String, List<ExistingOption>> existingOptions = findExistingOptions(ignored -> true);
 
-		assertExistingOptionsAreWellDefined(existingOptions);
+		final Map<String, List<ExistingOption>> existingDeduplicated = checkWellDefinedAndDeduplicate(existingOptions);
 
-		compareDocumentedAndExistingOptions(documentedOptions, existingOptions);
+		compareDocumentedAndExistingOptions(documentedOptions, existingDeduplicated);
 	}
 
-	@Test
-	public void testFullReferenceCompleteness() throws IOException, ClassNotFoundException {
-		Map<String, List<DocumentedOption>> documentedOptions = parseDocumentedOptions();
-		Map<String, List<ExistingOption>> existingOptions = findExistingOptions(ignored -> true);
+	private static Map<String, List<ExistingOption>> checkWellDefinedAndDeduplicate(Map<String, List<ExistingOption>> allOptions) {
+		return allOptions.entrySet().stream()
+			.map((entry) -> {
+				final List<ExistingOption> existingOptions = entry.getValue();
+				final List<ExistingOption> consolidated;
 
-		assertExistingOptionsAreWellDefined(existingOptions);
+				if (existingOptions.stream().allMatch(option -> option.isSuffixOption)) {
+					consolidated = existingOptions;
+				}
+				else {
+					Optional<ExistingOption> deduped = existingOptions.stream()
+						.reduce((option1, option2) -> {
+							if (option1.equals(option2)) {
+								// we allow multiple instances of ConfigOptions with the same key if they are identical
+								return option1;
+							} else {
+								// found a ConfigOption pair with the same key that aren't equal
+								// we fail here outright as this is not a documentation-completeness problem
+								if (!option1.defaultValue.equals(option2.defaultValue)) {
+									String errorMessage = String.format(
+										"Ambiguous option %s due to distinct default values (%s (in %s) vs %s (in %s)).",
+										option1.key,
+										option1.defaultValue,
+										option1.containingClass.getSimpleName(),
+										option2.defaultValue,
+										option2.containingClass.getSimpleName());
+									throw new AssertionError(errorMessage);
+								} else {
+									String errorMessage = String.format(
+										"Ambiguous option %s due to distinct descriptions (%s vs %s).",
+										option1.key,
+										option1.containingClass.getSimpleName(),
+										option2.containingClass.getSimpleName());
+									throw new AssertionError(errorMessage);
+								}
+							}
+						});
+					consolidated = Collections.singletonList(deduped.get());
+				}
 
-		compareDocumentedAndExistingOptions(documentedOptions, existingOptions);
+				return new Tuple2<>(entry.getKey(), consolidated);
+			})
+			.collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1));
 	}
 
-	private static void assertExistingOptionsAreWellDefined(Map<String, List<ExistingOption>> allOptions) {
-		allOptions.values().stream()
-			.filter(options -> !options.stream().allMatch(option -> option.isSuffixOption))
-			.forEach(options -> options.stream()
-				.reduce((option1, option2) -> {
-					if (option1.equals(option2)) {
-						// we allow multiple instances of ConfigOptions with the same key if they are identical
-						return option1;
-					} else {
-						// found a ConfigOption pair with the same key that aren't equal
-						// we fail here outright as this is not a documentation-completeness problem
-						if (!option1.defaultValue.equals(option2.defaultValue)) {
-							String errorMessage = String.format(
-								"Ambiguous option %s due to distinct default values (%s (in %s) vs %s (in %s)).",
-								option1.key,
-								option1.defaultValue,
-								option1.containingClass.getSimpleName(),
-								option2.defaultValue,
-								option2.containingClass.getSimpleName());
-							throw new AssertionError(errorMessage);
-						} else {
-							String errorMessage = String.format(
-								"Ambiguous option %s due to distinct descriptions (%s vs %s).",
-								option1.key,
-								option1.containingClass.getSimpleName(),
-								option2.containingClass.getSimpleName());
-							throw new AssertionError(errorMessage);
-						}
-					}
-				}));
-	}
+	private static void compareDocumentedAndExistingOptions(
+			Map<String, List<DocumentedOption>> documentedOptions,
+			Map<String, List<ExistingOption>> existingOptions) {
 
-	private static void compareDocumentedAndExistingOptions(Map<String, List<DocumentedOption>> documentedOptions, Map<String, List<ExistingOption>> existingOptions) {
 		final Collection<String> problems = new ArrayList<>(0);
 
 		// first check that all existing options are properly documented
@@ -130,7 +138,7 @@ public class ConfigOptionsDocsCompletenessITCase {
 					final Iterator<DocumentedOption> candidates = documentedState.iterator();
 
 					boolean matchFound = false;
-					while (candidates.hasNext() && !matchFound) {
+					while (candidates.hasNext()) {
 						DocumentedOption candidate = candidates.next();
 						if (supposedState.defaultValue.equals(candidate.defaultValue) && supposedState.description.equals(candidate.description)) {
 							matchFound = true;
@@ -138,6 +146,10 @@ public class ConfigOptionsDocsCompletenessITCase {
 						}
 					}
 
+					if (documentedState.isEmpty()) {
+						documentedOptions.remove(key);
+					}
+
 					if (!matchFound) {
 						problems.add(String.format(
 							"Documentation of %s in %s is outdated. Expected: default=(%s) description=(%s).",
@@ -169,15 +181,22 @@ public class ConfigOptionsDocsCompletenessITCase {
 	}
 
 	private static Map<String, List<DocumentedOption>> parseDocumentedCommonOptions() throws IOException {
-		Path commonSection = Paths.get(System.getProperty("rootDir"), "docs", "_includes", "generated", COMMON_SECTION_FILE_NAME);
+		final String rootDir = ConfigOptionsDocGeneratorTest.getProjectRootDir();
+
+		Path commonSection = Paths.get(rootDir, "docs", "_includes", "generated", COMMON_SECTION_FILE_NAME);
 		return parseDocumentedOptionsFromFile(commonSection).stream()
 			.collect(Collectors.groupingBy(option -> option.key, Collectors.toList()));
 	}
 
 	private static Map<String, List<DocumentedOption>> parseDocumentedOptions() throws IOException {
-		Path includeFolder = Paths.get(System.getProperty("rootDir"), "docs", "_includes", "generated").toAbsolutePath();
+		final String rootDir = ConfigOptionsDocGeneratorTest.getProjectRootDir();
+
+		Path includeFolder = Paths.get(rootDir, "docs", "_includes", "generated").toAbsolutePath();
 		return Files.list(includeFolder)
-			.filter(path -> path.getFileName().toString().contains("configuration"))
+			.filter((path) -> {
+				final String filename = path.getFileName().toString();
+				return filename.endsWith("configuration.html") || filename.endsWith("_section.html");
+			})
 			.flatMap(file -> {
 				try {
 					return parseDocumentedOptionsFromFile(file).stream();
@@ -216,10 +235,11 @@ public class ConfigOptionsDocsCompletenessITCase {
 	}
 
 	private static Map<String, List<ExistingOption>> findExistingOptions(Predicate<ConfigOptionsDocGenerator.OptionWithMetaInfo> predicate) throws IOException, ClassNotFoundException {
+		final String rootDir = ConfigOptionsDocGeneratorTest.getProjectRootDir();
 		final Collection<ExistingOption> existingOptions = new ArrayList<>();
 
 		for (OptionsClassLocation location : LOCATIONS) {
-			processConfigOptions(System.getProperty("rootDir"), location.getModule(), location.getPackage(), DEFAULT_PATH_PREFIX, optionsClass ->
+			processConfigOptions(rootDir, location.getModule(), location.getPackage(), DEFAULT_PATH_PREFIX, optionsClass ->
 				extractConfigOptions(optionsClass)
 					.stream()
 					.filter(predicate)
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
index 200b84d..b5aef9e 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
@@ -28,7 +28,10 @@ import org.apache.flink.configuration.ConfigOptions;
 @SuppressWarnings("unused") // this class is only accessed reflectively
 public class TestCommonOptions {
 
-	@Documentation.SectionOption(sections = {Documentation.SectionOption.SECTION_COMMON})
+	public static final String SECTION_1 = "test_A";
+	public static final String SECTION_2 = "other";
+
+	@Documentation.Section({SECTION_1, SECTION_2})
 	public static final ConfigOption<Integer> COMMON_OPTION = ConfigOptions
 		.key("first.option.a")
 		.intType()
@@ -41,9 +44,7 @@ public class TestCommonOptions {
 		.noDefaultValue()
 		.withDescription("This is the description for the generic option.");
 
-	@Documentation.SectionOption(
-		sections = {Documentation.SectionOption.SECTION_COMMON},
-		position = 2)
+	@Documentation.Section(value = SECTION_1, position = 2)
 	public static final ConfigOption<Integer> COMMON_POSITIONED_OPTION = ConfigOptions
 		.key("third.option.a")
 		.intType()